dsa_tdb package

Submodules

Module contents

The dsa_tdb module documentation.

The dsa_tdb module provides a set of tools to interact with the DSA Transparency Database (TDB) data. It provides a set of classes and functions to fetch, extract, transform, filter and load data from the TDB.

It intrnally uses pyspark to handle the data at scale even on regular computers and can be easily introduced in pipelines using pandas or other data manipulation libraries.

class dsa_tdb.TDB_DataFrame(spark: SparkSession | None = None)

Bases: object

The base class for the TDB DataFrame object. This class is used to load the TDB data into a DataFrame and perform operations on it. The operations are to filter and aggregate the data, exporting it to other formats.

The class is initialized with a SparkSession object and provides a set of methods to load data from the TDB.

The inherent DataFrame object is a Spark DataFrame and can be used as such, accessible through the df attribute.

aggregate_SoRs(columns_to_group: List[RawAndExplodedColumn] | None = None, horizontally_explode_columns: bool | None = None, delete_original_columns: bool | None = None, normalize_platform_name: bool | None = None, platforms_to_exclude: List[str] | None = None, platforms_to_include: List[str] | None = None, created_at_dt_floor: str | None = None, config_file: str | None = None, **kwargs)

Aggregates the SoRs from the dataframe. The configuration can be passed either using the provided and additional keyword arguments or by providing a configuration file in config_file. Note that if both are provided, the keyword arguments will take precedence.

Parameters:
  • columns_to_group (Union[List[T.RawAndExplodedColumn],None], optional) – The columns to group the data by, by default None will use all except uuid and platform_uid.

  • horizontally_explode_columns (bool, optional) – Whether to horizontally explode the columns with nested structures, by default True.

  • delete_original_columns (bool, optional) – Whether to delete the original columns after horizontally exploding them, by default False.

  • normalize_platform_name (bool, optional) – Whether to normalize the platform name to lowercase, by default False.

  • platforms_to_exclude (Union[List[str],None], optional) – The platforms to exclude from the data, by default None.

  • platforms_to_include (Union[List[str],None], optional) – The platforms to include in the data, by default None.

  • created_at_dt_floor (Union[str,None], optional) – The floor to round the created_at datetime to, by default None.

  • config_file (Union[str,None], optional) – The path to a configuration file, by default None.

  • **kwargs (dict) – The aggregation arguments. these are all the remaining entries of dsa_tdb.types.AggregationConfig that are not directly exposed in the function arguments.

property columns
filter_SoRs(columns_to_import: List[TDB_columnsFull] | None = None, horizontally_explode_columns: bool = True, delete_original_columns: bool = False, normalize_platform_name: bool = False, platforms_to_exclude: List[str] | None = None, platforms_to_include: List[str] | None = None, created_at_dt_floor: str | None = None, config_file: str | None = None, **kwargs)

Filters the SoRs from the dataframe. The configuration can be passed either using the provided and additional keyword arguments or by providing a configuration file in config_file. Note that if both are provided, the keyword arguments will take precedence.

Parameters:
  • columns_to_import (Union[List[T.TDB_columnsFull],None], optional) – The columns to import from the dataframe, by default None.

  • horizontally_explode_columns (bool, optional) – Whether to horizontally explode the columns with nested structures, by default True.

  • delete_original_columns (bool, optional) – Whether to delete the original columns after horizontally exploding them, by default False.

  • normalize_platform_name (bool, optional) – Whether to normalize the platform name to lowercase, by default False.

  • platforms_to_exclude (Union[List[str],None], optional) – The platforms to exclude from the data, by default None.

  • platforms_to_include (Union[List[str],None], optional) – The platforms to include in the data, by default None.

  • created_at_dt_floor (Union[str,None], optional) – The floor to round the created_at datetime to, by default None.

  • config_file (Union[str,None], optional) – The path to a configuration file, by default None.

  • **kwargs (dict) – The filter arguments. these are all the remaining entries of dsa_tdb.types.FilteringConfig that are not directly exposed in the function arguments.

head(n: int = 1)
loadData(root_folder: str, platform: str, version: TDB_dailyDumpsVersion = TDB_dailyDumpsVersion.full, platforms_to_exclude: List[str] | None = None, start_date: str | None = None, end_date: str | None = None, columns_to_import: List[str] | None = None, explode_columns: bool = False, delete_original: bool = True, fillna_str: str | None = None, fillna_bool: bool | None = False, input_format: TDB_chunkFormat = TDB_chunkFormat.parquet, content_date_range: List[str] | List[datetime] | None = None, decision_date_range: List[str] | List[datetime] | None = None, created_at_date_range: List[str] | List[datetime] | None = None, override_chunked_subfolder: str = 'daily_dumps_chunked', compute_restriction_duration: bool = False, normalize_platform_name: bool = True, normalize_content_type_other: bool = False)

Load data from the TDB into a Spark DataFrame.

This method loads the data from the TDB daily dumps into a Spark DataFrame. The data is loaded from the files in the specified root folder, for the specified platform and version. The data is filtered and transformed according to the specified options.

Parameters:
  • root_folder (str) – The root folder where the daily dumps for each platform and version are stored.

  • platform (str) – The platform to load the data from.

  • version (T.TDB_dailyDumpsVersion, optional) – The version of the daily dumps to load, by default dsa_tdb.types.TDB_dailyDumpsVersion.full.

  • platforms_to_exclude (Union[List[str],None], optional) – A list of platforms to exclude from the data, by default None.

  • start_date (Union[str,None], optional) – The start date to load the data from, by default None.

  • end_date (Union[str,None], optional) – The end date to load the data to, by default None.

  • columns_to_import (Union[List[str],None], optional) – The list of columns to import from the daily dumps, by default None.

  • explode_columns (bool, optional) – Whether to horizontally explode the columns with nested structures, by default False.

  • delete_original (bool, optional) – Whether to delete the original files after horizontally exploding them, by default True.

  • fillna_str (Union[str,None], optional) – The value to fill the missing string values with, by default None.

  • fillna_bool (Union[bool,None], optional) – The value to fill the missing boolean values with, by default False.

  • input_format (T.TDB_chunkFormat, optional) – The format of the daily dump files, by default T.TDB_chunkFormat.parquet.

  • content_date_range (Union[List[str],List[datetime],None], optional) – The date range to filter the content_date column, by default None.

  • decision_date_range (Union[List[str],List[datetime],None], optional) – The date range to filter the decision_date column, by default None.

  • created_at_date_range (Union[List[str],List[datetime],None], optional) – The date range to filter the created_at column, by default None.

  • override_chunked_subfolder (str, optional) – The subfolder where the chunked files are stored, by default dsa_tdb.types.CHUNKED_FILES_SUBFOLDER_NAME. Do not change this unless you know what you are doing.

  • compute_restriction_duration (bool, optional) – Whether to compute the restriction duration from the restriction_start and restriction_end columns, by default False.

  • normalize_platform_name (bool, optional) – Whether to normalize the platform name to lowercase, by default True.

  • normalize_content_type_other (bool, optional) – Whether to normalize the content_type_other column to lowercase, by default False.

Raises:

ValueError – If the content_date_range, decision_date_range or created_at_date_range have more than two elements.

loadParquet(path: str | List[str])

Load a parquet file into the DataFrame.

This method loads a parquet file into the DataFrame.

Parameters:

path (str, List[str]) – The path to the parquet file. Can also be a pattern to load multiple files or a list of paths.

sample(n: int = 1)
schema()
show(n: int = 20)
text_filter(column: str, expr: str)

Filter the DataFrame using a text expression.

Parameters:
  • column (str) – The column to filter on.

  • expr (str) – The expression to filter with. Can also be a regular expression.

toPandas() DataFrame