dsa_tdb.utils module

dsa_tdb.utils.arra2vals(arr: set | list) str | None

Function to port the array into a string.

Parameters:

arr ([set,list]) – The set of values.

Returns:

The string representation of the set. The format is ‘[“AAA”,”BBB”]’. If the input is None or an empty list, it returns None.

Return type:

str, None

dsa_tdb.utils.check_sha1_sum(local_filename: str | Path, local_sha1_filename: str | Path | None = None, BUF_SIZE: int = 65536)

Function to check the sha1 of a file.

Parameters:
  • local_filename (str) – The path of the file to check.

  • local_sha1_filename (str, optional) – The path of the sha1 file. If None, it will be the local_filename + ‘.sha1’, by default None

  • BUF_SIZE (int, optional) – The buffer size in bits to use when computing sha1, by default 65536

Raises:
  • AssertionError – If the sha1 is not correct.

  • ValueError – If the sha1 file does not exist.

dsa_tdb.utils.close_logger(logger, logger_process, comm_queue)

Closes the logger process and waits for it to finish.

Parameters:
  • logger (Logger) – The logger instance.

  • logger_process (Process) – The process managing the logger.

  • comm_queue (Queue) – The queue used for logging.

dsa_tdb.utils.compare_sha1_file_content(filename_a: str | Path, filename_b: str | Path) bool

Function to compare the sha1 of two files.

Parameters:
  • filename_a (str) – The path of the first file.

  • filename_b (str) – The path of the second file.

Returns:

True if the sha1 of the two files is the same, False otherwise.

Return type:

bool

dsa_tdb.utils.compute_end_date(end_date: str | None | datetime | date) date
dsa_tdb.utils.compute_files_to_process(root_folder: str, out_file_name: str, platform: str, version: TDB_dailyDumpsVersion = TDB_dailyDumpsVersion.full, input_format: TDB_chunkFormat = TDB_chunkFormat.parquet, output_format: AggregateFileFormat = AggregateFileFormat.parquet, local_chunked_subfolder: str = 'daily_dumps_chunked', write_mode: AggregateWriteMode = AggregateWriteMode.overwrite, start_date: str | None = None, end_date: str | None = None, step_name: str = 'analysis') Tuple[list, AggregateWriteMode, str, str, str, DataFrame]

Function to compute the files to process.

Parameters:
  • root_folder (str) – The root folder where the platform___version folders are stored.

  • local_chunked_subfolder (str) – The subfolder where the chunked files are stored.

  • out_file_name (str) – The name of the file to write, in the format ‘nameof_file.{}’.

  • platform (str) – The platform name.

  • version (str) – The version of the files.

  • input_format (str) – The input format.

  • output_format (str) – The output format.

  • write_mode (AggregateWriteMode, optional) – The write mode, by default AggregateWriteMode.overwrite

  • start_date (str, optional) – The start date. The format is ‘YYYY-MM-DD’. Default to None, which means no filtering.

  • end_date (str, optional) – The end date. The format is ‘YYYY-MM-DD’. Default to None, which means no filtering.

  • step_name (str) – The name of the step, usually ‘aggregate’ or ‘filter’.

Returns:

The list of files to process, the write mode, the output file name, the dates files filename, and the output configuration filename and the df of the dates files to remove. New in 0.7.1 also the list of the wrong_sha1_dates (days that were filtered/aggregated with an inconsistent dump). Downstream functions should then remove these dates from the output (because it will be substituted by the re-processed data). This function will take care of updating the dates files to remove the offending lines.

Return type:

List[str], AggregateWriteMode, str, str, str, pd.DataFrame, List[date]

dsa_tdb.utils.compute_start_date(start_date: str | None | datetime | date) date
dsa_tdb.utils.countWithNans(g)
dsa_tdb.utils.fifo_pool(num_processes: int, foo, args_list, logging_level: int = 20)
dsa_tdb.utils.generate_dates_files_df(files_to_process: List[str], input_root_folder: str)
dsa_tdb.utils.get_files_to_load(root_folder: str, platform: str, version: TDB_dailyDumpsVersion = TDB_dailyDumpsVersion.full, local_chunked_subfolder: str = 'daily_dumps_chunked', input_format: TDB_chunkFormat = TDB_chunkFormat.parquet, start_date: str | date | datetime | None = None, end_date: str | datetime | None = None) List[str]
dsa_tdb.utils.read_sha1_file(file_path: str | Path) str

Function to get the sha1 signature from a .sha1 file.

Parameters:

file_path (str) – The path of the file.

Returns:

The sha1 of the file.

Return type:

str

dsa_tdb.utils.sanitize_platform_name(platform_name: str, warn_on_change: bool = False) str

Function to sanitize the platform name.

Parameters:

platform_name (str) – The platform name.

Returns:

The sanitized platform name.

Return type:

str

dsa_tdb.utils.spark_session_factory(app_name: str = 'dsa_tdb', memory_limit: str | None = None, n_workers: int | None = None, spark_local_dir: str | None = None, options: dict | None = None) SparkSession

Function to create a Spark session. Note that the Spark master URL can be set using the environment variable SPARK_MASTER_URL. For instance, to use the local mode, you can set SPARK_MASTER_URL=local[*] before calling this function. `python import os os.environ['SPARK_MASTER_URL'] = 'local[*]' `

Parameters:
  • app_name (str, optional) – The name of the application, by default ‘dsa_tdb’

  • memory_limit (str, optional) – The memory limit to use, by default None

  • n_workers (int, optional) – The number of workers to use, by default None If None or <= 0, it will use all the available cores.

  • spark_local_dir (str, optional) – The local directory to use, by default None

  • options (dict, optional) – Additional options to pass to the Spark session, by default None. Must be a dictionary with the key-value pairs.

Returns:

The Spark session.

Return type:

SparkSession

dsa_tdb.utils.terminate_if_failure(processes, except_queue, logger, logger_process, comm_queue)

Terminates all remaining processes and closes the logger if an exception occurred in a worker process.

Parameters:
  • processes (list) – The list of running processes.

  • except_queue (Queue) – The queue where exceptions are stored.

  • logger (Logger) – The logger instance.

  • logger_process (Process) – The process managing the logger.

  • comm_queue (Queue) – The queue used for logging.

Raises:

Exception – The exception that occurred in a worker process.

dsa_tdb.utils.territories2label(ts: str) str

Function to transform the countries to a single string.

dsa_tdb.utils.vals2arra(s: str | None, as_set: bool = False) set | list

Function to port the array string encoding into a set of values. Assumes the form “[AAA,BBB]” or “AAA”.

Parameters:
  • s (str) – The string to parse. The format is “[AAA,BBB]” or “[“AAA”,”BBB”]” or “AAA”, spaces and double quotes will be removed.

  • as_set (bool, optional) – Whether to return a set or a list, by default False

Returns:

The set of values. If the object is not a string, it is a string ==`<NA>` or None is found then an empty list (set) is returned.

Return type:

[set,list]