Skip to content

Tools > Pipeline

Data Collectors.

This module provides full data pipeline.

run_pipeline(years_to_process, inmet_url, save_path, stage_path, output_path, stations_file, weather_file, stations_column_names, weather_column_names, stations_schema, weather_schema)

Execute the data collection pipeline for specified years.

This function manages the workflow of downloading, extracting, processing, and saving weather station data. It iterates over each year provided, downloading and processing the relevant data files.

Parameters

years_to_process : List[int] A list of years to process. inmet_url : str The base URL from which the data files are downloaded. save_path : str The local folder where downloaded zip files are saved. stage_path : str The path to the folder where extracted data is temporarily stored. output_path : str The path to the folder where processed data files are stored. stations_file : str The name of the output file for processed data. stations_column_names : Dict[str, str] Mapping of original column names to desired column names. schema The Pydantic schema used for data validation.

Returns

None This function does not return anything. It processes and saves data to the specified locations.

Raises

Exception If any step in the process (downloading, extracting, processing, saving) fails, an exception is raised considering the functions applied.

Source code in app/tools/pipeline.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@logger_decorator
def run_pipeline(
    years_to_process: List[int],
    inmet_url: str,
    save_path: str,
    stage_path: str,
    output_path: str,
    stations_file: str,
    weather_file: str,
    stations_column_names: Dict[str, str],
    weather_column_names: Dict[str, str],
    stations_schema,
    weather_schema,
) -> None:
    """
    Execute the data collection pipeline for specified years.

    This function manages the workflow of downloading, extracting, processing,
    and saving weather station data. It iterates over each year provided,
    downloading and processing the relevant data files.

    Parameters
    ----------
    years_to_process : List[int]
        A list of years to process.
    inmet_url : str
        The base URL from which the data files are downloaded.
    save_path : str
        The local folder where downloaded zip files are saved.
    stage_path : str
        The path to the folder where extracted data is temporarily stored.
    output_path : str
        The path to the folder where processed data files are stored.
    stations_file : str
        The name of the output file for processed data.
    stations_column_names : Dict[str, str]
        Mapping of original column names to desired column names.
    schema
        The Pydantic schema used for data validation.

    Returns
    -------
    None
        This function does not return anything. It processes and saves data
        to the specified locations.

    Raises
    ------
    Exception
        If any step in the process (downloading, extracting, processing,
        saving) fails, an exception is raised considering the functions
        applied.
    """
    years = collect_years_list(years_to_process)
    logger.info(f"Start extraction data from years {years}")

    for year in years:
        file_name = str(year) + ".zip"

        logger.info(f"Beginning download data from {year}")
        download_file(inmet_url, file_name, save_path)

        logger.info(f"Beginning unzip data from {year}")
        extract_zip(save_path, file_name, stage_path)

        logger.info(f"Finishing process {year}")

    logger.info("Starting processing stations data")
    stations_data = StationDataCollector(
        stage_path,
        output_path,
        stations_file,
        stations_column_names,
        stations_schema,
    )
    stations_data.start()

    logger.info("Starting processing weather data")
    weather_data = WeatherDataCollector(
        stage_path,
        output_path,
        weather_file,
        weather_column_names,
        weather_schema,
    )
    weather_data.start()

    logger.info("Cleaning temp folders")
    clear_folder(save_path)
    clear_folder(stage_path)

    logger.info("Finish the pipeline!")