dsa_tdb.etl module
- dsa_tdb.etl.aggregate_SoRs(spark: SparkSession, df: DataFrame, columns_to_group: List[RawAndExplodedColumns], horizontally_exploded_columns: bool = False, delete_original_columns: bool = False, normalize_platform_name: bool = False, platforms_to_exclude: List[str] | None = None, created_at_dt_floor: str = 'day', config_file: str | None = None, **kwargs) DataFrame
Aggregates the SoRs from the dataframe.
- Parameters:
spark (SparkSession) – The spark session handle.
df (DataFrame) – The input dataframe.
columns_to_group (List[T.RawAndExplodedColumns]) – The columns to group by.
horizontally_exploded_columns (bool, optional) – Whether to horizontally explode the columns or not, by default False
delete_original_columns (bool, optional) – Whether to delete the original columns after horizontally exploding them, by default False
normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the
dsa_tdb.types.coalesce_platforms_namesfor details.platforms_to_exclude (Union[List[str],None], optional) – The platforms to exclude, by default None
created_at_dt_floor (str, optional) – The date floor for the created at column, by default ‘day’ See the [spark documentation for the date_trunc function](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_trunc.html) for other possible values.
**kwargs (dict) – The aggregation arguments. These are all the remaining entries of
dsa_tdb.types.AggregationConfigthat are not directly exposed in the function arguments.
- Returns:
The aggregated dataframe.
- Return type:
DataFrame
- dsa_tdb.etl.filter_SoRs(spark: SparkSession, df: DataFrame, columns_to_import: List[TDB_columnsFull] | None = None, horizontally_explode_columns: bool = True, delete_original_columns: bool = False, normalize_platform_name: bool = False, platforms_to_exclude: List[str] | None = None, platforms_to_include: List[str] | None = None, created_at_dt_floor: str | None = None, config_file: str | None = None, **kwargs) DataFrame
Filters the SoRs from the dataframe. The configuration can be passed either using the provided and additional keyword arguments or by providing a configuration file in config_file. Note that if both are provided, the keyword arguments will take precedence.
- Parameters:
spark (SparkSession) – The spark session handle.
df (DataFrame) – The input dataframe.
columns_to_import (list, optional) – The columns to select from the df before exploding them, by default None, in which case all the columns are imported.
horizontally_explode_columns (bool, optional) – Whether to horizontally explode the columns or not, by default True.
delete_original_columns (bool, optional) – Whether to delete the original columns after exploding them, by default False.
normalize_platform_name (bool, optional) – Whether to normalize the platform name, by default False.
platforms_to_exclude (list, optional) – The platforms to exclude, by default None.
platforms_to_include (list, optional) – The platforms to include, by default None.
created_at_dt_floor (str, optional) – The datetime floor for the created_at column, by default None.
config_file (str, optional) – The configuration file, by default None.
**kwargs (dict) – The filter arguments. these are all the remaining entries of
dsa_tdb.types.FilteringConfigthat are not directly exposed in the function arguments.
- Returns:
The filtered dataframe.
- Return type:
DataFrame
- dsa_tdb.etl.loadDataset(df: DataFrame, spark: SparkSession, del_original: bool = True, explode_cols: bool = True, fillna_str: str | None = None, fillna_bool: bool | None = False, columns_to_fill_str: list = ['decision_monetary', 'decision_provision', 'decision_account', 'incompatible_content_illegal', 'content_language', 'territorial_scope', 'automated_decision', 'automated_detection'], columns_to_fill_bool: list = ['DECISION_VISIBILITY_CONTENT_REMOVED', 'DECISION_VISIBILITY_CONTENT_DISABLED', 'DECISION_VISIBILITY_CONTENT_DEMOTED', 'DECISION_VISIBILITY_CONTENT_AGE_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_INTERACTION_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_LABELLED', 'DECISION_VISIBILITY_OTHER'], compute_time_to_action: bool = True, compute_restriction_duration: bool = True, normalize_platform_name: bool = False, normalize_content_type_other: bool = True) DataFrame
The actual ETL. For each col in columns to explode created the columns of the possible values and fills them with bool checking for the value.
Operates on the input df without copy.
- Parameters:
df (pyspark.sql.DataFrame) – The input dataframe, could also be a dask dataframe. In this case the dask client must be provided in the client parameter.
spark (SparkSession) – The spark session handle.
del_original (bool, optional) – Whether to delete the original columns after exploding them, by default False
explode_cols (bool, optional) – whether to explode the columns or not, by default True. When exploding the columns the :attr:dsa_tdb.types.columns_to_explode` are used.
fillna_str (str, optional) – The value to fill the string columns with, by default ‘N/A’
fillna_bool (bool, optional) – The value to fill the bool columns with, by default False
columns_to_fill_str (list, optional) –
- The list of columns to fill with fillna_str, by default:
[‘decision_monetary’, ‘decision_provision’, ‘decision_account’, ‘incompatible_content_illegal’, ‘content_language’, ‘territorial_scope’, ‘automated_decision’,’automated_detection’]
columns_to_fill_bool (list, optional) – The list of columns to fill with fillna_bool, by default:
dsa_tdb.types.columns_to_explode``['decision_visibility']compute_time_to_action (bool, optional) – Whether to compute the time to action and reporting, by default True
compute_restriction_duration (bool, optional) – Whether to compute the restriction duration time when available, by default True
normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the
dsa_tdb.types.coalesce_platforms_namesfor details.normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False See the
dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATIONfor details.
- Returns:
The dataframe with the exploded columns. This is a dask dataframe if the input was a dask dataframe.
- Return type:
pd.DataFrame
- dsa_tdb.etl.loadFile(dump_files_pattern: str | List[str], spark: SparkSession, platforms_to_exclude: List[str] | None = None, columns_to_import: List[TDB_columnsFull] | None = None, columns_datetime: List[TDB_datetimeColumns] | None = None, content_date_range: List[datetime] | None = None, decision_date_range: List[datetime] | None = None, created_at_date_range: List[datetime] | None = None, input_format: str = 'csv', del_original: bool = True, explode_cols: bool = True, fillna_str: str | None = None, fillna_bool: bool | None = False, columns_to_fill_str: List[RawAndExplodedColumn] = [RawAndExplodedColumn.decision_monetary, RawAndExplodedColumn.decision_provision, RawAndExplodedColumn.decision_account, RawAndExplodedColumn.incompatible_content_illegal, RawAndExplodedColumn.content_language, RawAndExplodedColumn.territorial_scope, RawAndExplodedColumn.automated_decision, RawAndExplodedColumn.automated_detection], columns_to_fill_bool: list = ['DECISION_VISIBILITY_CONTENT_REMOVED', 'DECISION_VISIBILITY_CONTENT_DISABLED', 'DECISION_VISIBILITY_CONTENT_DEMOTED', 'DECISION_VISIBILITY_CONTENT_AGE_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_INTERACTION_RESTRICTED', 'DECISION_VISIBILITY_CONTENT_LABELLED', 'DECISION_VISIBILITY_OTHER'], compute_time_to_action: bool = True, compute_restriction_duration: bool = True, normalize_platform_name: bool = False, normalize_content_type_other: bool = True) DataFrame
Loads the dataset from the csv dump file. Will force datetime columns to be datetime for empty files.
- Parameters:
dump_files_pattern (str, List[str]) – The pattern of the dump files to load. Can be a glob pattern. If a list is provided, all the files provided are sorted and loaded.
platforms_to_exclude (list, optional) – The platforms to exclude, by default None. Pass the platform names with their simplified name (see
dsa_tdb.fetch.prepare_daily_dumpsfor details).columns_to_import (list, optional) – The columns to import, by default None
columns_datetime (list, optional) – The columns to parse as datetime, by default None
daskClient (Client, optional) – The dask client, by default None
content_date_range (tuple, optional) – The range of the content date, by default None
decision_date_range (tuple, optional) – The range of the decision date, by default None
created_at_date_range (tuple, optional) – The range of the created at date (upload to the DB), by default None
input_format (str, optional) – The format of the dump file being read, by default ‘csv’ Shall be ‘csv’ or ‘parquet’
del_original (bool, optional) – Whether to delete the original columns after exploding them, by default False
explode_cols (bool, optional) – Whether to explode the columns or not, by default True. When exploding the columns the :attr:dsa_tdb.types.columns_to_explode` are used.
fillna_str (str, optional) – The value to fill the string columns with, by default ‘N/A’
fillna_bool (bool, optional) – The value to fill the bool columns with, by default False
columns_to_fill_str (list, optional) –
- The list of columns to fill with fillna_str, by default:
[‘decision_monetary’, ‘decision_provision’, ‘decision_account’, ‘incompatible_content_illegal’, ‘content_language’, ‘territorial_scope’, ‘automated_decision’,’automated_detection’]
columns_to_fill_bool (list, optional) – The list of columns to fill with fillna_bool, by default:
dsa_tdb.types.columns_to_explodecompute_time_to_action (bool, optional) – Whether to compute the time to action and reporting, by default True
compute_restriction_duration (bool, optional) – Whether to compute the restriction duration time when available, by default True
normalize_platform_name (bool, optional) – Whether to coalesce platform name when a given platform name changed over time, by default False See the
dsa_tdb.types.coalesce_platforms_namesfor details.normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False See the
dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATIONfor details.
- Returns:
The dataframe with the exploded columns. This is a spark dataframe.
- Return type:
DataFrame
- dsa_tdb.etl.loadSparkDataset(files_pattern: str, spark: SparkSession | None = None, sql_view_name: str = 'dsa_tdb_dataset', memory_limit: str | None = None, n_workers: int = 0, spark_local_dir: str | None = None) tuple[SparkSession, DataFrame]
Initialize a spark session with default configuration.
- Parameters:
files_pattern (str) – Path to the dataset to load with spark.
spark (SparkSession, optional) – An optional spark session to use.
sql_view_name (str, optional) – An optional name for the created SQL view on the dataset. Defaults to ‘dsa_tdb_dataset’.
memory_limit (str, optional) – Spark memory limit.
n_workers (int, optional) – Number of spark workers.
spark_local_dir (str, optional) – Spark local dir value.
- Returns:
A tuple of the spark session and the loaded spark dataframe.
- Return type:
tuple[SparkSession, DataFrame]
- dsa_tdb.etl.sparkExplodeColumns(df: DataFrame, spark: SparkSession, delete_original: bool = False, normalize_content_type_other: bool = False) DataFrame
This function adds the exploded columns to the spark dataframe.
Note
Some free-text columns have similar names with the exploded columns (see
dsa_tdb.types.EXPLODED_COLUMNS) with the exception of the raw columns being lowercase and the exploded columns being uppercase. Remember that lowercase always refers to the raw, free-text columns, whereas uppercase columns names always refers to the exploded columns.Note
The only array column not exploded is the territoial_scope column. This is because the column is guaranteed to be sorted and we map it internally in the territories2label function.
- Parameters:
df (DataFrame) – The spark dataframe.
spark (SparkSession) – The spark session handle.
delete_original (bool, optional) – Whether to delete the original columns after exploding them, by default False.
normalize_content_type_other (bool, optional) – Whether to normalize the content type other column, by default False. See the
dsa_tdb.types.CONTENT_TYPE_OTHER_NORMALIZATIONfor details.
- Returns:
The spark dataframe with the horizontally exploded columns.
- Return type:
DataFrame
- dsa_tdb.etl.sparkSortArrayValues(df: DataFrame, spark: SparkSession) DataFrame
This function sorts the array values in the spark dataframe. It expects the columns to be exploded to be filled with string like ‘[“val1”,”val2”,…]’ or empty string / null.
Note
The only array column not exploded is the territoial_scope column. This is because the column is guaranteed to be sorted and we map it internally in the territories2label function.
- Parameters:
df (DataFrame) – The spark dataframe.
spark (SparkSession) – The spark session handle.
- Returns:
The spark dataframe with the columns to explode sorted.
- Return type:
DataFrame