Skip to content

download_data

convert_rda_to_csv(root, dataset_name)

Convert RDA files to CSV for specific datasets.

Source code in src/data/preprocessing/download_data.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def convert_rda_to_csv(root: Path, dataset_name: str) -> None:
    """Convert RDA files to CSV for specific datasets."""
    if dataset_name != DataSets.MECO_L2:
        return
    rda_path = root / 'MECOL2/stimuli/texts.meco.l2.rda'
    csv_path = root / 'MECOL2/stimuli/stimuli.csv'

    if csv_path.exists():
        logger.info(f'{csv_path} already exists. Skipping conversion...')
        return

    if not rda_path.exists():
        logger.warning(f'{rda_path} not found. Skipping conversion...')
        return

    logger.info(f'Converting {rda_path} to {csv_path}')
    rda_data = rdata.read_rda(rda_path)
    df = rda_data['d']
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(csv_path, index=False)
    logger.info(f'Saved stimuli CSV to {csv_path}')

download_auxiliary_files(root, dataset_name)

Download auxiliary resources not covered by DatasetLibrary for a specific dataset.

Source code in src/data/preprocessing/download_data.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def download_auxiliary_files(root: Path, dataset_name: str) -> None:
    """Download auxiliary resources not covered by DatasetLibrary for a specific dataset."""
    if dataset_name not in AUXILIARY_FILES:
        return

    for relative_path, resource_id in AUXILIARY_FILES[dataset_name].items():
        destination = root / relative_path
        if destination.exists():
            logger.info(
                f'{relative_path} already present at {destination}. Continuing...'
            )
            continue

        url = f'{BASE_OSF_URL}{resource_id}'
        logger.info(f'Downloading {relative_path} from {url}')
        response = requests.get(url, stream=True, timeout=60)
        response.raise_for_status()

        destination.parent.mkdir(parents=True, exist_ok=True)
        with open(destination, 'wb') as fp:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    fp.write(chunk)

load_or_download_dataset(dataset_name, data_path, download=False)

Load or download a dataset based on the flag.

Source code in src/data/preprocessing/download_data.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def load_or_download_dataset(
    dataset_name: str, data_path: Path, download: bool = False
) -> None:
    """Load or download a dataset based on the flag."""
    if dataset_name == DataSets.MECO_L2:
        dataset_def_w1 = prepare_dataset_definition(f'{dataset_name}W1')
        dataset_def_w2 = prepare_dataset_definition(f'{dataset_name}W2')
        dataset_w1 = pm.Dataset(dataset_def_w1, data_path / DataSets.MECO_L2W1)
        dataset_w2 = pm.Dataset(dataset_def_w2, data_path / DataSets.MECO_L2W2)
        if download:
            dataset_w1.download()
            dataset_w2.download()
        else:
            dataset_w1.load()
            dataset_w2.load()
    else:
        dataset_def = prepare_dataset_definition(dataset_name)
        dataset = pm.Dataset(dataset_def, data_path / dataset_name)
        if download:
            dataset.download()
        else:
            dataset.load()

prepare_dataset_definition(dataset_name)

Prepare dataset definition with gaze files disabled.

Source code in src/data/preprocessing/download_data.py
73
74
75
76
77
78
79
80
def prepare_dataset_definition(dataset_name: str):
    """Prepare dataset definition with gaze files disabled."""
    dataset_def = pm.DatasetLibrary.get(dataset_name)
    dataset_def.resources = ResourceDefinitions(
        [resource for resource in dataset_def.resources if resource.content != 'gaze']
    )

    return dataset_def