Skip to content

PatternMatch

PatternMatch(dataset, pattern, column_list=None, filter_condition_dict=None, dataset_filter_by_data_type=None, duckdb_connection=None, decimal_places=2)

Calculate pattern match metrics for text columns in a dataset using DuckDB.

Analyzes text columns to determine the percentage of rows matching a given regular expression pattern. Supports column selection by name and/or data type filtering with optional row filtering conditions.

Details

The function evaluates regex pattern matches across specified text columns, calculating match rates and returning detailed statistics. Uses DuckDB for efficient pattern matching on large datasets.

Example

Consider an 'email' column: ["user@example.com", "invalid-email", "another@domain.com", "not-an-email", "test@test.com"]

Pattern matching with email regex
  • Matching values: "user@example.com", "another@domain.com", "test@test.com" (count = 3)
  • Total values: 5
  • Match percentage = (3/5) * 100 = 60%

Parameters

dataset : Any Input dataset (DataFrame or table name). Can be: - Pandas/Polars DataFrame or DuckDB-compatible structure - String representing existing table name in DuckDB connection

str

Regular expression pattern to match against. Must be valid regex.

Optional[List[str]]

Columns to analyze. Example: ['email', 'username'] Used with/without dataset_filter_by_data_type. Default: None.

Optional[List[str]]

Filter columns by SQL type. Example: ['VARCHAR', 'TEXT'] Used with/without column_list. Default: None.

Optional[Dict[str, Union[str, int, float]]]

Row filter conditions. Example: {'department': 'IT'} Default: None.

Optional[DuckDBPyConnection]

DuckDB connection. Creates temporary if None. Default: None.

int

Decimal places for percentages. Must be >= 0. Default: 2.

Returns

List[Dict[str, Union[str, int, float, dict, None]]] Analysis results per column: - column_name: Name of analyzed column - match_count: Number of pattern matches - total_count: Total rows analyzed - match_percentage: Percent of matching rows - table_name: Analyzed table name - execution_timestamp_utc: UTC timestamp - filter_conditions: Applied filters - filtered_by_data_type: Type filters used - pattern: Regex pattern used

Raises

ValueError - Negative decimal_places - Invalid regex pattern - No column selection method - Columns not found - Non-string columns - Invalid filter columns

Source code in src/whistlingduck/analyzers/PatternMatch.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def PatternMatch(dataset: Any,
                pattern: str,
                column_list: Optional[List[str]] = None,
                filter_condition_dict: Optional[Dict[str, Union[str, int, float]]] = None,
                dataset_filter_by_data_type: Optional[List[str]] = None,
                duckdb_connection: Optional[DuckDBPyConnection] = None,
                decimal_places: int = 2

               ) -> List[Dict[str, Union[str, int, float, dict, None]]]:
    """
    Calculate pattern match metrics for text columns in a dataset using DuckDB.

    Analyzes text columns to determine the percentage of rows matching a given regular expression pattern.
    Supports column selection by name and/or data type filtering with optional row filtering conditions.

    Details
    -------
    The function evaluates regex pattern matches across specified text columns, calculating match rates 
    and returning detailed statistics. Uses DuckDB for efficient pattern matching on large datasets.

    Example
    -------
    Consider an 'email' column:
       ["user@example.com", "invalid-email", "another@domain.com", "not-an-email", "test@test.com"]

    Pattern matching with email regex:
       - Matching values: "user@example.com", "another@domain.com", "test@test.com" (count = 3) 
       - Total values: 5
       - Match percentage = (3/5) * 100 = 60%

    Parameters
    ----------
    dataset : Any
       Input dataset (DataFrame or table name). Can be:
       - Pandas/Polars DataFrame or DuckDB-compatible structure
       - String representing existing table name in DuckDB connection

    pattern : str
       Regular expression pattern to match against. Must be valid regex.

    column_list : Optional[List[str]]
       Columns to analyze. Example: ['email', 'username']
       Used with/without dataset_filter_by_data_type. Default: None.

    dataset_filter_by_data_type : Optional[List[str]] 
       Filter columns by SQL type. Example: ['VARCHAR', 'TEXT']
       Used with/without column_list. Default: None.

    filter_condition_dict : Optional[Dict[str, Union[str, int, float]]]
       Row filter conditions. Example: {'department': 'IT'}
       Default: None.

    duckdb_connection : Optional[DuckDBPyConnection] 
       DuckDB connection. Creates temporary if None.
       Default: None.

    decimal_places : int
       Decimal places for percentages. Must be >= 0.
       Default: 2.

    Returns
    -------
    List[Dict[str, Union[str, int, float, dict, None]]]
       Analysis results per column:
       - column_name: Name of analyzed column
       - match_count: Number of pattern matches
       - total_count: Total rows analyzed
       - match_percentage: Percent of matching rows
       - table_name: Analyzed table name
       - execution_timestamp_utc: UTC timestamp
       - filter_conditions: Applied filters
       - filtered_by_data_type: Type filters used
       - pattern: Regex pattern used

    Raises
    ------
    ValueError
       - Negative decimal_places
       - Invalid regex pattern
       - No column selection method 
       - Columns not found
       - Non-string columns
       - Invalid filter columns
    """

    if decimal_places < 0:
        raise ValueError("decimal_places must be non-negative")

    try:
        re.compile(pattern)
    except re.error:
        raise ValueError("Invalid regular expression pattern")

    if column_list is None and dataset_filter_by_data_type is None:
        raise ValueError(
            "Please provide either column_list or dataset_filter_by_data_type"
        )

    unique_id = str(uuid.uuid4()).replace('-', '_')
    timestamp = datetime.now(timezone.utc)
    temp_table_name = f"pattern_match_{unique_id}"

    if duckdb_connection is None:
        con = duckdb.connect()
        try:
            con.register(temp_table_name, dataset)
            source_table = temp_table_name
        except Exception as e:
            con.close()
            raise ValueError(f"Failed to register dataset: {str(e)}")
    else:
        con = duckdb_connection
        if isinstance(dataset, str):
            try:
                con.sql(f"PRAGMA table_info('{dataset}')")
                source_table = dataset
            except duckdb.CatalogException:
                raise ValueError(f"Table '{dataset}' does not exist")
        else:
            try:
                con.register(temp_table_name, dataset)
                source_table = temp_table_name
            except Exception as e:
                raise ValueError(f"Failed to register dataset: {str(e)}")

    dtype_info = con.sql(f"PRAGMA table_info('{source_table}')").pl()
    dataset_columns = dtype_info['name'].to_list()

    final_column_list = set()

    if column_list:
        if not isinstance(column_list, list):
            raise ValueError(
                "column_list must be a list of strings"
            )
        invalid_cols = set(column_list) - set(dataset_columns)
        if invalid_cols:
            raise ValueError(f"Columns not found: {', '.join(invalid_cols)}")
        final_column_list.update(column_list)

    string_types = {'VARCHAR', 'TEXT', 'CHAR', 'STRING'}
    if dataset_filter_by_data_type:
        if not isinstance(dataset_filter_by_data_type, list):
            raise ValueError(
                "dataset_filter_by_data_type must be a list of strings"
            )

        data_type_columns = dtype_info.filter(
            pl.col("type").str.to_uppercase().is_in([dt.upper() for dt in dataset_filter_by_data_type])
        )['name'].to_list()

        if not data_type_columns:
            raise ValueError(
                f"No columns found of types {dataset_filter_by_data_type}"
            )

        final_column_list.update(data_type_columns)

    final_column_list = list(final_column_list)

    for column in final_column_list:
        column_type = dtype_info.filter(pl.col("name") == column)['type'].item().upper()
        if not any(string_type in column_type for string_type in string_types):
            raise ValueError(
                f"Column '{column}' must be a string type"
            )

    if filter_condition_dict:
        if not isinstance(filter_condition_dict, dict):
            raise ValueError("filter_condition_dict must be a dictionary")

        invalid_filter_cols = set(filter_condition_dict.keys()) - set(dataset_columns)
        if invalid_filter_cols:
            raise ValueError(f"Filter columns not found: {', '.join(invalid_filter_cols)}")

        where_clause = "WHERE " + " AND ".join(
            f"{col} = '{val}'" if isinstance(val, str) else f"{col} = {val}"
            for col, val in filter_condition_dict.items()
        )
    else:
        where_clause = ""

    sql_statements = [
        f"""
        SELECT 
            '{column}' as column_name,
            COUNT(CASE WHEN regexp_matches({column}, '{pattern}') THEN 1 END) as match_count,
            COUNT(*) as total_count,
            ROUND(CAST(COUNT(CASE WHEN regexp_matches({column}, '{pattern}') THEN 1 END) AS DOUBLE) * 100.0 / 
                  NULLIF(COUNT(*), 0), {decimal_places}) as match_percentage
        FROM {source_table}
        {where_clause}
        """
        for column in final_column_list
    ]

    sql_query = " UNION ALL ".join(sql_statements)
    result = con.sql(sql_query).pl()

    if duckdb_connection is None:
        con.close()

    results = result.select([
        pl.col('column_name'),
        pl.col('match_count').cast(pl.Int64),
        pl.col('total_count').cast(pl.Int64),
        pl.col('match_percentage').cast(pl.Float64)
    ]).to_dicts()

    for result in results:
        result.update({
            'table_name': source_table,
            'execution_timestamp_utc': timestamp.strftime("%Y-%m-%d %H:%M:%S"),
            'filter_conditions': filter_condition_dict if filter_condition_dict else None,
            'filtered_by_data_type': dataset_filter_by_data_type if dataset_filter_by_data_type else None,
            'pattern': pattern
        })

    return results