Skip to content

Size

Size(dataset, column_list=None, filter_condition_dict=None, dataset_filter_by_data_type=None, duckdb_connection=None, decimal_places=2)

Calculate size metrics for a dataset using DuckDB.

This function calculates the total number of rows in a dataset, with optional filtering and column validation capabilities.

Example

Consider a dataset with 1000 rows and a filter condition {'category': 'electronics'}

Size calculation: - Total rows without filter: 1000 - Total rows after filter: 200 (assuming 200 rows match the filter)

Returns:

Type Description
List[Dict[str, Union[str, int, float, dict, None]]]

List[Dict[str, Union[str, int, float, dict, None]]]: A list of dictionaries, one per column, with the following keys: - column_name (str): Name of the analyzed column - row_count (int): Total number of rows for this column after filtering - table_name (str): Name of the analyzed table - execution_timestamp_utc (str): Timestamp of execution in UTC - filter_conditions (dict|None): Applied filter conditions if any - filtered_by_data_type (list|None): Data types used for filtering if any

Parameters:

Name Type Description Default
dataset Any

Input dataset that can be either: - A DataFrame (pandas, polars) or other DuckDB-compatible data structure - A string representing an existing table name in the DuckDB connection

required
column_list Optional[List[str]]

List of column names to validate.

None
filter_condition_dict Optional[Dict[str, Union[str, int, float]]]

Dictionary of filter conditions to apply before calculating size.

None
dataset_filter_by_data_type Optional[List[str]]

Data type(s) to filter columns.

None
duckdb_connection Optional[DuckDBPyConnection]

Existing DuckDB connection.

None
decimal_places int

Number of decimal places for rounding.

2
Source code in src/whistlingduck/analyzers/Size.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def Size(dataset: Any, 
         column_list: Optional[List[str]] = None, 
         filter_condition_dict: Optional[Dict[str, Union[str, int, float]]] = None,
         dataset_filter_by_data_type: Optional[List[str]] = None,
         duckdb_connection: Optional[DuckDBPyConnection] = None,
         decimal_places: int = 2
        ) -> List[Dict[str, Union[str, int, float, dict, None]]]:
    """
    Calculate size metrics for a dataset using DuckDB.

    This function calculates the total number of rows in a dataset, with optional filtering
    and column validation capabilities.

    Example:
        Consider a dataset with 1000 rows and a filter condition {'category': 'electronics'}

        Size calculation:
        - Total rows without filter: 1000
        - Total rows after filter: 200 (assuming 200 rows match the filter)

    Returns:
        List[Dict[str, Union[str, int, float, dict, None]]]: A list of dictionaries, one per column, with the following keys:
            - column_name (str): Name of the analyzed column
            - row_count (int): Total number of rows for this column after filtering
            - table_name (str): Name of the analyzed table
            - execution_timestamp_utc (str): Timestamp of execution in UTC
            - filter_conditions (dict|None): Applied filter conditions if any
            - filtered_by_data_type (list|None): Data types used for filtering if any

    Args:
        dataset (Any): Input dataset that can be either:
            - A DataFrame (pandas, polars) or other DuckDB-compatible data structure
            - A string representing an existing table name in the DuckDB connection
        column_list (Optional[List[str]], optional): List of column names to validate.
        filter_condition_dict (Optional[Dict[str, Union[str, int, float]]], optional):
            Dictionary of filter conditions to apply before calculating size.
        dataset_filter_by_data_type (Optional[List[str]], optional): 
            Data type(s) to filter columns.
        duckdb_connection (Optional[DuckDBPyConnection], optional): Existing DuckDB connection.
        decimal_places (int, optional): Number of decimal places for rounding.
    """
    if decimal_places < 0:
        raise ValueError("decimal_places must be non-negative")

    # Generate UUID for table name and get UTC timestamp
    unique_id = str(uuid.uuid4()).replace('-', '_')
    timestamp = datetime.now(timezone.utc)
    temp_table_name = f"size_{unique_id}"

    if column_list is None and dataset_filter_by_data_type is None:
        raise ValueError(
            "Please provide either a list of columns using column_list or specify "
            "data type(s) using dataset_filter_by_data_type."
        )

    # Handle DuckDB connection and table registration
    if duckdb_connection is None:
        con = duckdb.connect()
        try:
            con.register(temp_table_name, dataset)
            source_table = temp_table_name
        except Exception as e:
            con.close()
            raise ValueError(f"Failed to register dataset: {str(e)}. Please ensure the dataset is in a DuckDB-compatible format.")
    else:
        con = duckdb_connection
        if isinstance(dataset, str):
            try:
                con.sql(f"PRAGMA table_info('{dataset}')")
                source_table = dataset
            except duckdb.CatalogException:
                raise ValueError(f"Table '{dataset}' does not exist in the DuckDB connection")
        else:
            try:
                con.register(temp_table_name, dataset)
                source_table = temp_table_name
            except Exception as e:
                raise ValueError(
                    f"Failed to register dataset with existing connection: {str(e)}. "
                    "Please ensure the dataset is in a DuckDB-compatible format."
                )

    # Get table info for column validation
    dtype_info = con.sql(f"PRAGMA table_info('{source_table}')").pl()
    dataset_columns = dtype_info['name'].to_list()

    # Initialize final column list
    final_column_list = set()

    # Validate column list if provided
    if column_list:
        if not isinstance(column_list, list):
            raise ValueError(
                "column_list must be a list of strings. "
                "For single column, use ['column_name'] instead of 'column_name'."
            )
        invalid_cols = set(column_list) - set(dataset_columns)
        if invalid_cols:
            raise ValueError(
                f"These columns were not found in the dataset: {', '.join(invalid_cols)}. "
                "Please verify the column names."
            )
        final_column_list.update(column_list)

    # Handle data type filtering
    if dataset_filter_by_data_type:
        if not isinstance(dataset_filter_by_data_type, list):
            raise ValueError(
                "dataset_filter_by_data_type must be a list of strings. "
                "For single data type, use ['VARCHAR'] instead of 'VARCHAR'."
            )
        data_types = dataset_filter_by_data_type

        data_type_columns = dtype_info.filter(
            pl.col("type").str.to_uppercase().is_in([dt.upper() for dt in data_types])
        )['name'].to_list()

        if not data_type_columns:
            raise ValueError(
                f"We couldn't find any columns of types {data_types}. "
                "You might want to check the data types or consider specifying columns directly using column_list."
            )

        final_column_list.update(data_type_columns)

    # Convert set back to list
    final_column_list = list(final_column_list)

    # Handle filter conditions
    if filter_condition_dict:
        if not isinstance(filter_condition_dict, dict):
            raise ValueError(
                "filter_condition_dict must be a dictionary. "
                "For single filter condition, use {'column_name': value} instead of a single value."
            )
        invalid_filter_cols = list(set(filter_condition_dict.keys()) - set(dataset_columns))
        if invalid_filter_cols:
            raise ValueError(
                f"We couldn't find these columns in your dataset: {', '.join(invalid_filter_cols)}. "
                "Please verify the column names in your filter conditions."
            )

        where_clause = "WHERE " + " AND ".join(
            f"{col} = '{val}'" if isinstance(val, str) else f"{col} = {val}"
            for col, val in filter_condition_dict.items()
        )
    else:
        where_clause = ""

    # Generate SQL queries for size calculation
    sql_statements = [
        f"""
        SELECT 
            '{column}' as column_name,
            COUNT(*) as row_count
        FROM {source_table}
        {where_clause}
        """
        for column in final_column_list
    ]

    sql_query = " UNION ALL ".join(sql_statements)
    result = con.sql(sql_query).pl()

    if duckdb_connection is None:
        con.close()

    # Format results
    results = result.select([
        pl.col('column_name'),
        pl.col('row_count').cast(pl.Int64)
    ]).to_dicts()

    # Add metadata to results
    for result in results:
        result.update({
            'table_name': source_table,
            'execution_timestamp_utc': timestamp.strftime("%Y-%m-%d %H:%M:%S"),
            'filter_conditions': filter_condition_dict if filter_condition_dict else None,
            'filtered_by_data_type': dataset_filter_by_data_type if dataset_filter_by_data_type else None
        })

    return results