dsa_tdb.etl module

dsa_tdb.etl.aggregate_SoRs(spark: SparkSession, df: DataFrame, columns_to_group: List[RawAndExplodedColumns], horizontally_exploded_columns: bool = False, delete_original_columns: bool = False, normalize_platform_name: bool = False, platforms_to_exclude: List[str] | None = None, created_at_dt_floor: str = 'day', config_file: str | None = None, **kwargs) DataFrame

Aggregates the SoRs from the dataframe.

Parameters:
  • spark (SparkSession) – The spark session handle.

  • df (DataFrame) – The input dataframe.

  • columns_to_group (List[T.RawAndExplodedColumns]) – The columns to group by.

  • horizontally_exploded_columns (bool, optional) – Whether to horizontally explode the columns or not, by default False

  • delete_original_columns (bool, optional) – Whether to delete the original columns after horizontally exploding them, by default False

  • normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the dsa_tdb.types.coalesce_platforms_names for details.

  • platforms_to_exclude (Union[List[str],None], optional) – The platforms to exclude, by default None

  • created_at_dt_floor (str, optional) – The date floor for the created at column, by default ‘day’ See the [spark documentation for the date_trunc function](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_trunc.html) for other possible values.

  • **kwargs (dict) – The aggregation arguments. These are all the remaining entries of dsa_tdb.types.AggregationConfig that are not directly exposed in the function arguments.

Returns:

The aggregated dataframe.

Return type:

DataFrame

dsa_tdb.etl.filter_SoRs(spark: SparkSession, df: DataFrame, columns_to_import: List[TDB_columnsFull] | None = None, horizontally_explode_columns: bool = True, delete_original_columns: bool = False, normalize_platform_name: bool = False, platforms_to_exclude: List[str] | None = None, platforms_to_include: List[str] | None = None, created_at_dt_floor: str | None = None, config_file: str | None = None, **kwargs) DataFrame

Filters the SoRs from the dataframe. The configuration can be passed either using the provided and additional keyword arguments or by providing a configuration file in config_file. Note that if both are provided, the keyword arguments will take precedence.

Parameters:
  • spark (SparkSession) – The spark session handle.

  • df (DataFrame) – The input dataframe.

  • columns_to_import (list, optional) – The columns to select from the df before exploding them, by default None, in which case all the columns are imported.

  • horizontally_explode_columns (bool, optional) – Whether to horizontally explode the columns or not, by default True.

  • delete_original_columns (bool, optional) – Whether to delete the original columns after exploding them, by default False.

  • normalize_platform_name (bool, optional) – Whether to normalize the platform name, by default False.

  • platforms_to_exclude (list, optional) – The platforms to exclude, by default None.

  • platforms_to_include (list, optional) – The platforms to include, by default None.

  • created_at_dt_floor (str, optional) – The datetime floor for the created_at column, by default None.

  • config_file (str, optional) – The configuration file, by default None.

  • **kwargs (dict) – The filter arguments. these are all the remaining entries of dsa_tdb.types.FilteringConfig that are not directly exposed in the function arguments.

Returns:

The filtered dataframe.

Return type:

DataFrame

dsa_tdb.etl.loadDataset(df: DataFrame, spark: SparkSession, del_original: bool = True, explode_cols: bool = True, fillna_str: str | None = None, fillna_bool: bool | None = False, columns_to_fill_str: list = ['decision_monetary', 'decision_provision', 'decision_account', 'incompatible_content_illegal', 'content_language', 'territorial_scope', 'automated_decision', 'automated_detection'], columns_to_fill_bool: list = ['DECISION_VISIBILITY_CONTENT_REMOVED', 'DECISION_VISIBILITY_CONTENT_DISABLED', 'DECISION_VISIBILITY_CONTENT_DEMOTED', 'DECISION_VISIBILITY_CONTENT_AGE_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_INTERACTION_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_LABELLED', 'DECISION_VISIBILITY_OTHER'], compute_time_to_action: bool = True, compute_restriction_duration: bool = True, normalize_platform_name: bool = False, normalize_content_type_other: bool = True) DataFrame

The actual ETL. For each col in columns to explode created the columns of the possible values and fills them with bool checking for the value.

Operates on the input df without copy.

Parameters:
  • df (pyspark.sql.DataFrame) – The input dataframe, could also be a dask dataframe. In this case the dask client must be provided in the client parameter.

  • spark (SparkSession) – The spark session handle.

  • del_original (bool, optional) – Whether to delete the original columns after exploding them, by default False

  • explode_cols (bool, optional) – whether to explode the columns or not, by default True. When exploding the columns the :attr:dsa_tdb.types.columns_to_explode` are used.

  • fillna_str (str, optional) – The value to fill the string columns with, by default ‘N/A’

  • fillna_bool (bool, optional) – The value to fill the bool columns with, by default False

  • columns_to_fill_str (list, optional) –

    The list of columns to fill with fillna_str, by default:

    [‘decision_monetary’, ‘decision_provision’, ‘decision_account’, ‘incompatible_content_illegal’, ‘content_language’, ‘territorial_scope’, ‘automated_decision’,’automated_detection’]

  • columns_to_fill_bool (list, optional) – The list of columns to fill with fillna_bool, by default: dsa_tdb.types.columns_to_explode``['decision_visibility']

  • compute_time_to_action (bool, optional) – Whether to compute the time to action and reporting, by default True

  • compute_restriction_duration (bool, optional) – Whether to compute the restriction duration time when available, by default True

  • normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the dsa_tdb.types.coalesce_platforms_names for details.

  • normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False See the dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATION for details.

Returns:

The dataframe with the exploded columns. This is a dask dataframe if the input was a dask dataframe.

Return type:

pd.DataFrame

dsa_tdb.etl.loadFile(dump_files_pattern: str | List[str], spark: SparkSession, platforms_to_exclude: List[str] | None = None, columns_to_import: List[TDB_columnsFull] | None = None, columns_datetime: List[TDB_datetimeColumns] | None = None, content_date_range: List[datetime] | None = None, decision_date_range: List[datetime] | None = None, created_at_date_range: List[datetime] | None = None, input_format: str = 'csv', del_original: bool = True, explode_cols: bool = True, fillna_str: str | None = None, fillna_bool: bool | None = False, columns_to_fill_str: List[RawAndExplodedColumn] = [RawAndExplodedColumn.decision_monetary, RawAndExplodedColumn.decision_provision, RawAndExplodedColumn.decision_account, RawAndExplodedColumn.incompatible_content_illegal, RawAndExplodedColumn.content_language, RawAndExplodedColumn.territorial_scope, RawAndExplodedColumn.automated_decision, RawAndExplodedColumn.automated_detection], columns_to_fill_bool: list = ['DECISION_VISIBILITY_CONTENT_REMOVED', 'DECISION_VISIBILITY_CONTENT_DISABLED', 'DECISION_VISIBILITY_CONTENT_DEMOTED', 'DECISION_VISIBILITY_CONTENT_AGE_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_INTERACTION_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_LABELLED', 'DECISION_VISIBILITY_OTHER'], compute_time_to_action: bool = True, compute_restriction_duration: bool = True, normalize_platform_name: bool = False, normalize_content_type_other: bool = True) DataFrame

Loads the dataset from the csv dump file. Will force datetime columns to be datetime for empty files.

Parameters:
  • dump_files_pattern (str, List[str]) – The pattern of the dump files to load. Can be a glob pattern. If a list is provided, all the files provided are sorted and loaded.

  • platforms_to_exclude (list, optional) – The platforms to exclude, by default None. Pass the platform names with their simplified name (see dsa_tdb.fetch.prepare_daily_dumps for details).

  • columns_to_import (list, optional) – The columns to import, by default None

  • columns_datetime (list, optional) – The columns to parse as datetime, by default None

  • daskClient (Client, optional) – The dask client, by default None

  • content_date_range (tuple, optional) – The range of the content date, by default None

  • decision_date_range (tuple, optional) – The range of the decision date, by default None

  • created_at_date_range (tuple, optional) – The range of the created at date (upload to the DB), by default None

  • input_format (str, optional) – The format of the dump file being read, by default ‘csv’ Shall be ‘csv’ or ‘parquet’

  • del_original (bool, optional) – Whether to delete the original columns after exploding them, by default False

  • explode_cols (bool, optional) – Whether to explode the columns or not, by default True. When exploding the columns the :attr:dsa_tdb.types.columns_to_explode` are used.

  • fillna_str (str, optional) – The value to fill the string columns with, by default ‘N/A’

  • fillna_bool (bool, optional) – The value to fill the bool columns with, by default False

  • columns_to_fill_str (list, optional) –

    The list of columns to fill with fillna_str, by default:

    [‘decision_monetary’, ‘decision_provision’, ‘decision_account’, ‘incompatible_content_illegal’, ‘content_language’, ‘territorial_scope’, ‘automated_decision’,’automated_detection’]

  • columns_to_fill_bool (list, optional) – The list of columns to fill with fillna_bool, by default: dsa_tdb.types.columns_to_explode

  • compute_time_to_action (bool, optional) – Whether to compute the time to action and reporting, by default True

  • compute_restriction_duration (bool, optional) – Whether to compute the restriction duration time when available, by default True

  • normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the dsa_tdb.types.coalesce_platforms_names for details.

  • normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False See the dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATION for details.

Returns:

The dataframe with the exploded columns. This is a spark dataframe.

Return type:

DataFrame

dsa_tdb.etl.loadSparkDataset(files_pattern: str, spark: SparkSession | None = None, sql_view_name: str = 'dsa_tdb_dataset', memory_limit: str | None = None, n_workers: int = 0, spark_local_dir: str | None = None) tuple[SparkSession, DataFrame]

Initialize a spark session with default configuration.

Parameters:
  • files_pattern (str) – Path to the dataset to load with spark.

  • spark (SparkSession, optional) – An optional spark session to use.

  • sql_view_name (str, optional) – An optional name for the created SQL view on the dataset. Defaults to ‘dsa_tdb_dataset’.

  • memory_limit (str, optional) – Spark memory limit.

  • n_workers (int, optional) – Number of spark workers.

  • spark_local_dir (str, optional) – Spark local dir value.

Returns:

A tuple of the spark session and the loaded spark dataframe.

Return type:

tuple[SparkSession, DataFrame]

dsa_tdb.etl.sparkExplodeColumns(df: DataFrame, spark: SparkSession, delete_original: bool = False, normalize_content_type_other: bool = False) DataFrame

This function adds the exploded columns to the spark dataframe.

Note

Some free-text columns have similar names with the exploded columns (see dsa_tdb.types.EXPLODED_COLUMNS) with the exception of the raw columns being lowercase and the exploded columns being uppercase. Remember that lowercase always refers to the raw, free-text columns, whereas uppercase columns names always refers to the exploded columns.

Note

The only array column not exploded is the territoial_scope column. This is because the column is guaranteed to be sorted and we map it internally in the territories2label function.

Parameters:
  • df (DataFrame) – The spark dataframe.

  • spark (SparkSession) – The spark session handle.

  • delete_original (bool, optional) – Whether to delete the original columns after exploding them, by default False.

  • normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False. See the dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATION for details.

Returns:

The spark dataframe with the horizontally exploded columns.

Return type:

DataFrame

dsa_tdb.etl.sparkSortArrayValues(df: DataFrame, spark: SparkSession) DataFrame

This function sorts the array values in the spark dataframe. It expects the columns to be exploded to be filled with string like ‘[“val1”,”val2”,…]’ or empty string / null.

Note

The only array column not exploded is the territoial_scope column. This is because the column is guaranteed to be sorted and we map it internally in the territories2label function.

Parameters:
  • df (DataFrame) – The spark dataframe.

  • spark (SparkSession) – The spark session handle.

Returns:

The spark dataframe with the columns to explode sorted.

Return type:

DataFrame