Skip to content

Cross-Column Rules

Rules for validating relationships between multiple columns.

CrossColumnRule

Bases: CustomRule

Base class for cross-column validation rules.

Cross-column rules validate relationships between multiple columns in a DataFrame. They can check equality, comparisons, arithmetic operations, conditional logic, and referential integrity.

Attributes:

Name Type Description
name

Human-readable rule name.

description

Description of what the rule checks.

source_columns

List of column names to check (at least 2 required).

operation

Type of operation to perform. Valid options: - "equals": Column A must equal Column B - "greater_than": Column A must be greater than Column B - "less_than": Column A must be less than Column B - "sum_equals": Sum of source columns must equal target column - "conditional": If condition is met, then check another condition - "referential": Values in source column must exist in target column

target_column

Optional target column for operations like sum_equals or referential integrity.

condition

Optional dictionary for conditional rules containing: - "if_column": Column to check condition on - "if_value": Value to check for - "then_column": Column to validate if condition is met - "then_value": Expected value in then_column

Source code in lavendertown/rules/cross_column.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
class CrossColumnRule(CustomRule):
    """Base class for cross-column validation rules.

    Cross-column rules validate relationships between multiple columns in a
    DataFrame. They can check equality, comparisons, arithmetic operations,
    conditional logic, and referential integrity.

    Attributes:
        name: Human-readable rule name.
        description: Description of what the rule checks.
        source_columns: List of column names to check (at least 2 required).
        operation: Type of operation to perform. Valid options:
            - "equals": Column A must equal Column B
            - "greater_than": Column A must be greater than Column B
            - "less_than": Column A must be less than Column B
            - "sum_equals": Sum of source columns must equal target column
            - "conditional": If condition is met, then check another condition
            - "referential": Values in source column must exist in target column
        target_column: Optional target column for operations like sum_equals
            or referential integrity.
        condition: Optional dictionary for conditional rules containing:
            - "if_column": Column to check condition on
            - "if_value": Value to check for
            - "then_column": Column to validate if condition is met
            - "then_value": Expected value in then_column
    """

    def __init__(
        self,
        name: str,
        description: str,
        source_columns: list[str],
        operation: str,
        target_column: str | None = None,
        condition: dict[str, str] | None = None,
    ) -> None:
        """Initialize cross-column rule.

        Args:
            name: Human-readable rule name.
            description: Description of what the rule checks.
            source_columns: List of column names to check. Must have at least 2.
            operation: Type of operation. Must be one of: "equals", "greater_than",
                "less_than", "sum_equals", "conditional", "referential".
            target_column: Optional target column for operations that require it.
            condition: Optional condition dictionary for conditional rules.

        Raises:
            ValueError: If source_columns has fewer than 2 columns, or if operation
                is invalid, or if required columns are missing for specific operations.
        """
        # Referential rules only need 1 source column
        if operation != "referential" and len(source_columns) < 2:
            raise ValueError(
                f"Cross-column rules require at least 2 columns, got {len(source_columns)}"
            )
        if operation == "referential" and len(source_columns) < 1:
            raise ValueError(
                f"Referential rules require at least 1 source column, got {len(source_columns)}"
            )

        valid_operations = {
            "equals",
            "greater_than",
            "less_than",
            "sum_equals",
            "conditional",
            "referential",
        }
        if operation not in valid_operations:
            raise ValueError(
                f"Operation must be one of {valid_operations}, got {operation}"
            )

        if operation in ["sum_equals", "referential"] and target_column is None:
            raise ValueError(
                f"Operation '{operation}' requires a target_column to be specified"
            )

        if operation == "conditional" and condition is None:
            raise ValueError("Operation 'conditional' requires a condition dictionary")

        super().__init__(
            name, description, column=None
        )  # Cross-column rules don't have a single column
        self.source_columns = source_columns
        self.operation = operation
        self.target_column = target_column
        self.condition = condition

    def check(self, df: object) -> list[GhostFinding]:
        """Check cross-column rule against DataFrame.

        Args:
            df: DataFrame to check. Can be pandas.DataFrame or polars.DataFrame.

        Returns:
            List of GhostFinding objects for rule violations. Returns empty list
            if no violations found. Returns a single error finding if columns
            don't exist or rule is misconfigured.
        """
        backend = detect_dataframe_backend(df)

        # Validate columns exist
        missing_columns = []
        for col in self.source_columns:
            if backend == "pandas":
                if col not in df.columns:  # type: ignore[attr-defined]
                    missing_columns.append(col)
            else:
                if col not in df.schema:  # type: ignore[attr-defined]
                    missing_columns.append(col)
        if self.target_column:
            if backend == "pandas":
                if self.target_column not in df.columns:  # type: ignore[attr-defined]
                    missing_columns.append(self.target_column)
            else:
                if self.target_column not in df.schema:  # type: ignore[attr-defined]
                    missing_columns.append(self.target_column)
        if self.condition:
            if_col = self.condition.get("if_column")
            then_col = self.condition.get("then_column")
            if if_col:
                if backend == "pandas":
                    if if_col not in df.columns:  # type: ignore[attr-defined]
                        missing_columns.append(if_col)
                else:
                    if if_col not in df.schema:  # type: ignore[attr-defined]
                        missing_columns.append(if_col)
            if then_col:
                if backend == "pandas":
                    if then_col not in df.columns:  # type: ignore[attr-defined]
                        missing_columns.append(then_col)
                else:
                    if then_col not in df.schema:  # type: ignore[attr-defined]
                        missing_columns.append(then_col)

        if missing_columns:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column="",
                    severity="error",
                    description=(
                        f"Rule '{self.name}': Missing columns: {', '.join(missing_columns)}"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "missing_columns": missing_columns,
                    },
                )
            ]

        if backend == "pandas":
            return self._check_pandas(df)
        elif backend == "polars":
            return self._check_polars(df)
        else:
            raise ValueError(f"Unsupported backend: {backend}")

    def _check_pandas(self, df: object) -> list[GhostFinding]:
        """Check rule using Pandas API.

        Args:
            df: pandas.DataFrame to check.

        Returns:
            List of GhostFinding objects for violations.
        """

        if self.operation == "equals":
            return self._check_equals_pandas(df)
        elif self.operation == "greater_than":
            return self._check_greater_than_pandas(df)
        elif self.operation == "less_than":
            return self._check_less_than_pandas(df)
        elif self.operation == "sum_equals":
            return self._check_sum_equals_pandas(df)
        elif self.operation == "conditional":
            return self._check_conditional_pandas(df)
        elif self.operation == "referential":
            return self._check_referential_pandas(df)
        else:
            return []

    def _check_polars(self, df: object) -> list[GhostFinding]:
        """Check rule using Polars API.

        Args:
            df: polars.DataFrame to check.

        Returns:
            List of GhostFinding objects for violations.
        """

        if self.operation == "equals":
            return self._check_equals_polars(df)
        elif self.operation == "greater_than":
            return self._check_greater_than_polars(df)
        elif self.operation == "less_than":
            return self._check_less_than_polars(df)
        elif self.operation == "sum_equals":
            return self._check_sum_equals_polars(df)
        elif self.operation == "conditional":
            return self._check_conditional_polars(df)
        elif self.operation == "referential":
            return self._check_referential_polars(df)
        else:
            return []

    def _check_equals_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if two columns are equal (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]
        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations = df_pd[col1] != df_pd[col2]
        violations = violations & df_pd[col1].notna() & df_pd[col2].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,  # Use first column as primary
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"'{col1}' != '{col2}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "equals",
                        "columns": [col1, col2],
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_equals_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if two columns are equal (Polars)."""
        import polars as pl

        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations_df = df.filter(  # type: ignore[attr-defined]
            (pl.col(col1) != pl.col(col2))
            & pl.col(col1).is_not_null()
            & pl.col(col2).is_not_null()
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"'{col1}' != '{col2}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "equals",
                        "columns": [col1, col2],
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

    def _check_greater_than_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if col1 > col2 (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]
        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations = df_pd[col1] <= df_pd[col2]
        violations = violations & df_pd[col1].notna() & df_pd[col2].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"'{col1}' <= '{col2}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "greater_than",
                        "columns": [col1, col2],
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_greater_than_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if col1 > col2 (Polars)."""
        import polars as pl

        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations_df = df.filter(  # type: ignore[attr-defined]
            (pl.col(col1) <= pl.col(col2))
            & pl.col(col1).is_not_null()
            & pl.col(col2).is_not_null()
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"'{col1}' <= '{col2}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "greater_than",
                        "columns": [col1, col2],
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

    def _check_less_than_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if col1 < col2 (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]
        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations = df_pd[col1] >= df_pd[col2]
        violations = violations & df_pd[col1].notna() & df_pd[col2].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"'{col1}' >= '{col2}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "less_than",
                        "columns": [col1, col2],
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_less_than_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if col1 < col2 (Polars)."""
        import polars as pl

        col1, col2 = self.source_columns[0], self.source_columns[1]
        violations_df = df.filter(  # type: ignore[attr-defined]
            (pl.col(col1) >= pl.col(col2))
            & pl.col(col1).is_not_null()
            & pl.col(col2).is_not_null()
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=col1,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"'{col1}' >= '{col2}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "less_than",
                        "columns": [col1, col2],
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

    def _check_sum_equals_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if sum of source columns equals target column (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]
        sum_cols = self.source_columns
        target_col = self.target_column

        if target_col is None:
            return []

        # Calculate sum of source columns
        sum_values = df_pd[sum_cols].sum(axis=1)
        violations = sum_values != df_pd[target_col]
        violations = violations & sum_values.notna() & df_pd[target_col].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=target_col,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"sum({', '.join(sum_cols)}) != '{target_col}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "sum_equals",
                        "source_columns": sum_cols,
                        "target_column": target_col,
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_sum_equals_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check if sum of source columns equals target column (Polars)."""
        import polars as pl

        sum_cols = self.source_columns
        target_col = self.target_column

        if target_col is None:
            return []

        # Calculate sum
        sum_expr = sum([pl.col(col) for col in sum_cols])  # type: ignore[arg-type]
        violations_df = df.filter(  # type: ignore[attr-defined]  # type: ignore[attr-defined]
            (sum_expr != pl.col(target_col))  # type: ignore[union-attr]
            & sum_expr.is_not_null()  # type: ignore[union-attr]
            & pl.col(target_col).is_not_null()  # type: ignore[union-attr]
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=target_col,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"sum({', '.join(sum_cols)}) != '{target_col}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "sum_equals",
                        "source_columns": sum_cols,
                        "target_column": target_col,
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

    def _check_conditional_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check conditional rule: if col1 == X then col2 == Y (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]

        if self.condition is None:
            return []

        if_col = self.condition.get("if_column")
        if_value = self.condition.get("if_value")
        then_col = self.condition.get("then_column")
        then_value = self.condition.get("then_value")

        if not all([if_col, if_value, then_col, then_value]):
            return []

        # Find rows where condition is met
        condition_met = df_pd[if_col] == if_value
        condition_met = condition_met & df_pd[if_col].notna()

        # Check if then condition is violated
        violations = condition_met & (df_pd[then_col] != then_value)
        violations = violations & df_pd[then_col].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=then_col if then_col else "",
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"'{if_col}' == '{if_value}' but '{then_col}' != '{then_value}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "conditional",
                        "condition": self.condition,
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_conditional_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check conditional rule: if col1 == X then col2 == Y (Polars)."""
        import polars as pl

        if self.condition is None:
            return []

        if_col = self.condition.get("if_column")
        if_value = self.condition.get("if_value")
        then_col = self.condition.get("then_column")
        then_value = self.condition.get("then_value")

        if not all([if_col, if_value, then_col, then_value]):
            return []

        # Find violations
        violations_df = df.filter(  # type: ignore[attr-defined]  # type: ignore[attr-defined]
            (pl.col(if_col if if_col else "") == if_value)
            & pl.col(if_col if if_col else "").is_not_null()
            & (pl.col(then_col if then_col else "") != then_value)
            & pl.col(then_col if then_col else "").is_not_null()
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=then_col if then_col else "",
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"'{if_col}' == '{if_value}' but '{then_col}' != '{then_value}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "conditional",
                        "condition": self.condition,
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

    def _check_referential_pandas(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check referential integrity: source column values must exist in target (Pandas)."""
        import pandas as pd

        df_pd: pd.DataFrame = df  # type: ignore[assignment]
        source_col = self.source_columns[0]
        target_col = self.target_column

        if target_col is None:
            return []

        # Get valid values from target column
        valid_values = set(df_pd[target_col].dropna().unique())

        # Find violations
        violations = ~df_pd[source_col].isin(valid_values)
        violations = violations & df_pd[source_col].notna()

        violation_indices = df_pd[violations].index.tolist()

        if len(violation_indices) > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=source_col,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {len(violation_indices)} rows where "
                        f"'{source_col}' values don't exist in '{target_col}'"
                    ),
                    row_indices=violation_indices,
                    metadata={
                        "rule_name": self.name,
                        "operation": "referential",
                        "source_column": source_col,
                        "target_column": target_col,
                        "violation_count": len(violation_indices),
                    },
                )
            ]
        return []

    def _check_referential_polars(self, df: object) -> list[GhostFinding]:  # type: ignore[type-arg]
        """Check referential integrity: source column values must exist in target (Polars)."""
        import polars as pl

        source_col = self.source_columns[0]
        target_col = self.target_column

        if target_col is None:
            return []

        # Get valid values
        valid_values = (
            df.select(pl.col(target_col).drop_nulls()).unique().to_series().to_list()  # type: ignore[attr-defined]
        )

        # Find violations
        violations_df = df.filter(  # type: ignore[attr-defined]  # type: ignore[attr-defined]
            ~pl.col(source_col).is_in(valid_values) & pl.col(source_col).is_not_null()
        )

        violation_count = len(violations_df)

        if violation_count > 0:
            return [
                GhostFinding(
                    ghost_type="rule",
                    column=source_col,
                    severity="warning",
                    description=(
                        f"Rule '{self.name}': {violation_count} rows where "
                        f"'{source_col}' values don't exist in '{target_col}'"
                    ),
                    row_indices=None,
                    metadata={
                        "rule_name": self.name,
                        "operation": "referential",
                        "source_column": source_col,
                        "target_column": target_col,
                        "violation_count": violation_count,
                    },
                )
            ]
        return []

Functions

__init__

__init__(
    name,
    description,
    source_columns,
    operation,
    target_column=None,
    condition=None,
)

Parameters:

Name Type Description Default
name str

Human-readable rule name.

required
description str

Description of what the rule checks.

required
source_columns list[str]

List of column names to check. Must have at least 2.

required
operation str

Type of operation. Must be one of: "equals", "greater_than", "less_than", "sum_equals", "conditional", "referential".

required
target_column str | None

Optional target column for operations that require it.

None
condition dict[str, str] | None

Optional condition dictionary for conditional rules.

None

Raises:

Type Description
ValueError

If source_columns has fewer than 2 columns, or if operation is invalid, or if required columns are missing for specific operations.

Source code in lavendertown/rules/cross_column.py
def __init__(
    self,
    name: str,
    description: str,
    source_columns: list[str],
    operation: str,
    target_column: str | None = None,
    condition: dict[str, str] | None = None,
) -> None:
    """Initialize cross-column rule.

    Args:
        name: Human-readable rule name.
        description: Description of what the rule checks.
        source_columns: List of column names to check. Must have at least 2.
        operation: Type of operation. Must be one of: "equals", "greater_than",
            "less_than", "sum_equals", "conditional", "referential".
        target_column: Optional target column for operations that require it.
        condition: Optional condition dictionary for conditional rules.

    Raises:
        ValueError: If source_columns has fewer than 2 columns, or if operation
            is invalid, or if required columns are missing for specific operations.
    """
    # Referential rules only need 1 source column
    if operation != "referential" and len(source_columns) < 2:
        raise ValueError(
            f"Cross-column rules require at least 2 columns, got {len(source_columns)}"
        )
    if operation == "referential" and len(source_columns) < 1:
        raise ValueError(
            f"Referential rules require at least 1 source column, got {len(source_columns)}"
        )

    valid_operations = {
        "equals",
        "greater_than",
        "less_than",
        "sum_equals",
        "conditional",
        "referential",
    }
    if operation not in valid_operations:
        raise ValueError(
            f"Operation must be one of {valid_operations}, got {operation}"
        )

    if operation in ["sum_equals", "referential"] and target_column is None:
        raise ValueError(
            f"Operation '{operation}' requires a target_column to be specified"
        )

    if operation == "conditional" and condition is None:
        raise ValueError("Operation 'conditional' requires a condition dictionary")

    super().__init__(
        name, description, column=None
    )  # Cross-column rules don't have a single column
    self.source_columns = source_columns
    self.operation = operation
    self.target_column = target_column
    self.condition = condition

check

check(df)

Check cross-column rule against DataFrame.

Parameters:

Name Type Description Default
df object

DataFrame to check. Can be pandas.DataFrame or polars.DataFrame.

required

Returns:

Type Description
list[GhostFinding]

List of GhostFinding objects for rule violations. Returns empty list

list[GhostFinding]

if no violations found. Returns a single error finding if columns

list[GhostFinding]

don't exist or rule is misconfigured.

Source code in lavendertown/rules/cross_column.py
def check(self, df: object) -> list[GhostFinding]:
    """Check cross-column rule against DataFrame.

    Args:
        df: DataFrame to check. Can be pandas.DataFrame or polars.DataFrame.

    Returns:
        List of GhostFinding objects for rule violations. Returns empty list
        if no violations found. Returns a single error finding if columns
        don't exist or rule is misconfigured.
    """
    backend = detect_dataframe_backend(df)

    # Validate columns exist
    missing_columns = []
    for col in self.source_columns:
        if backend == "pandas":
            if col not in df.columns:  # type: ignore[attr-defined]
                missing_columns.append(col)
        else:
            if col not in df.schema:  # type: ignore[attr-defined]
                missing_columns.append(col)
    if self.target_column:
        if backend == "pandas":
            if self.target_column not in df.columns:  # type: ignore[attr-defined]
                missing_columns.append(self.target_column)
        else:
            if self.target_column not in df.schema:  # type: ignore[attr-defined]
                missing_columns.append(self.target_column)
    if self.condition:
        if_col = self.condition.get("if_column")
        then_col = self.condition.get("then_column")
        if if_col:
            if backend == "pandas":
                if if_col not in df.columns:  # type: ignore[attr-defined]
                    missing_columns.append(if_col)
            else:
                if if_col not in df.schema:  # type: ignore[attr-defined]
                    missing_columns.append(if_col)
        if then_col:
            if backend == "pandas":
                if then_col not in df.columns:  # type: ignore[attr-defined]
                    missing_columns.append(then_col)
            else:
                if then_col not in df.schema:  # type: ignore[attr-defined]
                    missing_columns.append(then_col)

    if missing_columns:
        return [
            GhostFinding(
                ghost_type="rule",
                column="",
                severity="error",
                description=(
                    f"Rule '{self.name}': Missing columns: {', '.join(missing_columns)}"
                ),
                row_indices=None,
                metadata={
                    "rule_name": self.name,
                    "missing_columns": missing_columns,
                },
            )
        ]

    if backend == "pandas":
        return self._check_pandas(df)
    elif backend == "polars":
        return self._check_polars(df)
    else:
        raise ValueError(f"Unsupported backend: {backend}")