Skip to content

Core

polars_labrodq_extension.polars_ns.dataframe.DataFrameDQNamespace

Polars DataFrame namespace for data-quality checks.

After importing this module (or labrodq that imports it), every pl.DataFrame instance will have a .ldq property:

df.ldq.run_checks(...)
df.ldq.run_yaml("dq/test.yml")
df.ldq.quality_report_yaml("dq/test.yml")

This is a minimal MVP implementation.

Source code in src/polars_labrodq_extension/polars_ns/dataframe.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@pl.api.register_dataframe_namespace("ldq")
class DataFrameDQNamespace:
    """
    Polars DataFrame namespace for data-quality checks.

    After importing this module (or labrodq that imports it),
    every `pl.DataFrame` instance will have a `.ldq` property:

        df.ldq.run_checks(...)
        df.ldq.run_yaml("dq/test.yml")
        df.ldq.quality_report_yaml("dq/test.yml")

    This is a minimal MVP implementation.
    """

    def __init__(self, df: pl.DataFrame) -> None:
        self._df: pl.DataFrame = df

    # -------- Public API --------

    def run_checks(self, checks: Iterable[CheckDef]) -> List[CheckResult]:
        results: List[CheckResult] = []

        for check in checks:
            if check.type == "not_null":
                result = self._eval_not_null(check)
            elif check.type == "max_null_ratio":
                result = self._eval_max_null_ratio(check)
            else:
                msg = f"Unsupported check type: {check.type!r}"
                result = CheckResult(
                    name=check.name,
                    type=check.type,
                    column=check.column,
                    level=check.level,
                    passed=False,
                    message=msg,
                    details={},
                )
            results.append(result)

        return results

    def quality_report(self, checks: Iterable[CheckDef]) -> pl.DataFrame:
        """
        Run checks and return a tabular Polars DataFrame with results.
        """
        results = self.run_checks(checks)

        return pl.DataFrame(
            {
                "check": [r.name for r in results],
                "type": [r.type for r in results],
                "column": [r.column for r in results],
                "level": [r.level.value for r in results],
                "passed": [r.passed for r in results],
                "message": [r.message for r in results],
            }
        )

    # --- YAML helpers ---

    def run_yaml(self, path: str) -> List[CheckResult]:
        """
        Load checks from a YAML file and execute them.
        """
        checks = load_checks_from_yaml(path)
        return self.run_checks(checks)

    def quality_report_yaml(self, path: str) -> pl.DataFrame:
        """
        Shortcut: load checks from YAML and return a quality report as DataFrame.
        """
        checks = load_checks_from_yaml(path)
        return self.quality_report(checks)

    # -------- Internal helpers for concrete check types --------

    def _eval_not_null(self, check: CheckDef) -> CheckResult:
        col = check.column
        if col not in self._df.columns:
            msg = f"Column {col!r} not found"
            return CheckResult(
                name=check.name,
                type=check.type,
                column=col,
                level=check.level,
                passed=False,
                message=msg,
                details={"column_exists": False},
            )

        null_count = self._df[col].null_count()
        passed = null_count == 0

        if passed:
            msg = f"Column {col!r} has no nulls"
        else:
            msg = f"Column {col!r} has {null_count} null values"

        return CheckResult(
            name=check.name,
            type=check.type,
            column=col,
            level=check.level,
            passed=passed,
            message=msg,
            details={
                "null_count": null_count,
                "row_count": self._df.height,
                "null_ratio": (
                    null_count / self._df.height if self._df.height > 0 else None
                ),
            },
        )

    def _eval_max_null_ratio(self, check: CheckDef) -> CheckResult:
        col = check.column
        params = check.params or {}
        threshold = float(params.get("threshold", 0.0))

        if col not in self._df.columns:
            msg = f"Column {col!r} not found"
            return CheckResult(
                name=check.name,
                type=check.type,
                column=col,
                level=check.level,
                passed=False,
                message=msg,
                details={"column_exists": False, "threshold": threshold},
            )

        row_count = self._df.height
        null_count = self._df[col].null_count()
        null_ratio = null_count / row_count if row_count > 0 else 0.0

        passed = null_ratio <= threshold

        if passed:
            msg = (
                f"Null ratio for column {col!r} is {null_ratio:.4f} "
                f"(<= {threshold:.4f})"
            )
        else:
            msg = (
                f"Null ratio for column {col!r} is {null_ratio:.4f} "
                f"(> {threshold:.4f})"
            )

        return CheckResult(
            name=check.name,
            type=check.type,
            column=col,
            level=check.level,
            passed=passed,
            message=msg,
            details={
                "row_count": row_count,
                "null_count": null_count,
                "null_ratio": null_ratio,
                "threshold": threshold,
            },
        )

quality_report(checks)

Run checks and return a tabular Polars DataFrame with results.

Source code in src/polars_labrodq_extension/polars_ns/dataframe.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def quality_report(self, checks: Iterable[CheckDef]) -> pl.DataFrame:
    """
    Run checks and return a tabular Polars DataFrame with results.
    """
    results = self.run_checks(checks)

    return pl.DataFrame(
        {
            "check": [r.name for r in results],
            "type": [r.type for r in results],
            "column": [r.column for r in results],
            "level": [r.level.value for r in results],
            "passed": [r.passed for r in results],
            "message": [r.message for r in results],
        }
    )

quality_report_yaml(path)

Shortcut: load checks from YAML and return a quality report as DataFrame.

Source code in src/polars_labrodq_extension/polars_ns/dataframe.py
80
81
82
83
84
85
def quality_report_yaml(self, path: str) -> pl.DataFrame:
    """
    Shortcut: load checks from YAML and return a quality report as DataFrame.
    """
    checks = load_checks_from_yaml(path)
    return self.quality_report(checks)

run_checks(checks)

Source code in src/polars_labrodq_extension/polars_ns/dataframe.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def run_checks(self, checks: Iterable[CheckDef]) -> List[CheckResult]:
    results: List[CheckResult] = []

    for check in checks:
        if check.type == "not_null":
            result = self._eval_not_null(check)
        elif check.type == "max_null_ratio":
            result = self._eval_max_null_ratio(check)
        else:
            msg = f"Unsupported check type: {check.type!r}"
            result = CheckResult(
                name=check.name,
                type=check.type,
                column=check.column,
                level=check.level,
                passed=False,
                message=msg,
                details={},
            )
        results.append(result)

    return results

run_yaml(path)

Load checks from a YAML file and execute them.

Source code in src/polars_labrodq_extension/polars_ns/dataframe.py
73
74
75
76
77
78
def run_yaml(self, path: str) -> List[CheckResult]:
    """
    Load checks from a YAML file and execute them.
    """
    checks = load_checks_from_yaml(path)
    return self.run_checks(checks)