dfguard
==========
The lightest way to enforce DataFrame schema checks in Python, using type
annotations. Supports pandas, Polars, and PySpark.
**dfguard rejects the wrong DataFrame at the function call** with a precise error:
which function, which argument, what schema was expected, what arrived.
Enforcement is pure metadata inspection: no data scanned, no Spark jobs triggered. Unlike
`pandera `_, which introduces its own
type system, or `Great Expectations `_, which scans
actual data and requires significant setup, dfguard uses the types your library
already ships with, such as ``T.LongType()`` for PySpark, ``pl.Int64`` for Polars,
or ``np.dtype("int64")`` for pandas.
Explicitly calling validation at every stage peppers your codebase with boilerplate.
Place one ``dfg.arm()`` call in your package entry point and every function with a
schema-annotated DataFrame argument is enforced automatically. Use ``@dfg.enforce``
on individual functions for explicit per-function control. By default, declared
columns must be present with correct types and extra columns are fine. Pass
``subset=False`` to require an exact match.
Compatibility
-------------
.. list-table::
:header-rows: 1
:widths: 30 25 25
* - Backend
- Version
- Python
* - PySpark
- >= 3.3
- >= 3.10
* - pandas
- >= 1.5
- >= 3.10
* - Polars
- >= 0.20
- >= 3.10
----
.. tab-set::
.. tab-item:: PySpark
:sync: pyspark
.. code-block:: python
import dfguard.pyspark as dfg
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.getOrCreate()
raw_df = spark.createDataFrame(
[(1, 10.0, 3), (2, 5.0, 7)],
"order_id LONG, amount DOUBLE, quantity INT",
)
# Option A: arm() -- covers the whole package, no decorator on each function
# Place dfg.arm() in my_pipeline/__init__.py
dfg.arm()
class RawSchema(dfg.SparkSchema):
order_id = T.LongType()
amount = T.DoubleType()
quantity = T.IntegerType()
def enrich(df: RawSchema): # enforced by arm()
return df.withColumn("revenue", F.col("amount") * F.col("quantity"))
# captures schema of the returned DataFrame
EnrichedSchema = dfg.schema_of(enrich(raw_df))
@dfg.enforce(subset=False) # exact match: no extra columns allowed
def flag_high_value(df: EnrichedSchema):
return df.withColumn("is_vip", F.col("revenue") > 1000)
flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
# expected: order_id:bigint, amount:double, quantity:int, revenue:double
# received: order_id:bigint, amount:double, quantity:int
.. tab-item:: pandas
:sync: pandas
.. code-block:: python
import numpy as np
import pandas as pd
import dfguard.pandas as dfg
raw_df = pd.DataFrame({
"order_id": pd.array([1, 2, 3], dtype="int64"),
"amount": pd.array([10.0, 5.0, 8.5], dtype="float64"),
"quantity": pd.array([3, 1, 2], dtype="int64"),
})
# Option A: arm() -- covers the whole package, no decorator on each function
dfg.arm()
class RawSchema(dfg.PandasSchema):
order_id = np.dtype("int64")
amount = np.dtype("float64")
quantity = np.dtype("int64")
def enrich(df: RawSchema): # enforced by arm()
return df.assign(revenue=df["amount"] * df["quantity"])
# captures schema of the returned DataFrame
EnrichedSchema = dfg.schema_of(enrich(raw_df))
@dfg.enforce(subset=False) # exact match: no extra columns allowed
def flag_high_value(df: EnrichedSchema):
return df.assign(is_vip=df["revenue"] > 1000)
flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
# expected: order_id:int64, amount:float64, quantity:int64, revenue:float64
# received: order_id:int64, amount:float64, quantity:int64
.. tab-item:: Polars
:sync: polars
.. code-block:: python
import polars as pl
import dfguard.polars as dfg
raw_df = pl.DataFrame({
"order_id": pl.Series([1, 2, 3], dtype=pl.Int64),
"amount": pl.Series([10.0, 5.0, 8.5], dtype=pl.Float64),
"quantity": pl.Series([3, 1, 2], dtype=pl.Int32),
})
# Option A: arm() -- covers the whole package, no decorator on each function
dfg.arm()
class RawSchema(dfg.PolarsSchema):
order_id = pl.Int64
amount = pl.Float64
quantity = pl.Int32
def enrich(df: RawSchema) -> pl.DataFrame: # enforced by arm()
return df.with_columns(revenue=pl.col("amount") * pl.col("quantity"))
# captures schema of the returned DataFrame
EnrichedSchema = dfg.schema_of(enrich(raw_df))
@dfg.enforce(subset=False) # exact match: no extra columns allowed
def flag_high_value(df: EnrichedSchema) -> pl.DataFrame:
return df.with_columns(is_vip=pl.col("revenue") > 1000)
flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
# expected: order_id:Int64, amount:Float64, quantity:Int32, revenue:Float64
# received: order_id:Int64, amount:Float64, quantity:Int32
No validation logic inside the functions.
The wrong DataFrame simply cannot enter the wrong function.
Call ``dfg.arm()`` once from your package ``__init__.py`` to protect the whole
package. No decorator needed on each function. See the :doc:`quickstart`.
Two ways to define a schema
----------------------------
**Option A: Capture from a live DataFrame**
.. code-block:: python
RawSchema = dfg.schema_of(raw_df)
EnrichedSchema = dfg.schema_of(enriched_df)
Useful for quick scripts and existing code where you already have a DataFrame.
No boilerplate. The schema is locked to that DataFrame's shape at that moment.
**Option B: Declare upfront as a class**
.. tab-set::
.. tab-item:: PySpark
:sync: pyspark
.. code-block:: python
from dfguard.pyspark import Optional
class OrderSchema(dfg.SparkSchema):
order_id = T.LongType()
amount = T.DoubleType()
line_items = T.ArrayType(T.StructType([ # array of structs
T.StructField("sku", T.StringType()),
T.StructField("quantity", T.IntegerType()),
T.StructField("price", T.DoubleType()),
]))
zip_code = Optional[T.StringType()] # nullable field
class EnrichedSchema(OrderSchema): # inherits all parent fields
revenue = T.DoubleType()
.. tab-item:: pandas
:sync: pandas
.. code-block:: python
import pyarrow as pa
from dfguard.pandas import Optional
class OrderSchema(dfg.PandasSchema):
order_id = np.dtype("int64")
amount = np.dtype("float64")
line_items = pd.ArrowDtype(pa.list_(pa.struct([ # nested via PyArrow
pa.field("sku", pa.string()),
pa.field("quantity", pa.int32()),
pa.field("price", pa.float64()),
])))
zip_code = Optional[pd.StringDtype()] # nullable field
class EnrichedSchema(OrderSchema): # inherits all parent fields
revenue = np.dtype("float64")
.. tab-item:: Polars
:sync: polars
.. code-block:: python
from dfguard.polars import Optional
class OrderSchema(dfg.PolarsSchema):
order_id = pl.Int64
amount = pl.Float64
line_items = pl.List(pl.Struct({ # list of structs
"sku": pl.String,
"quantity": pl.Int32,
"price": pl.Float64,
}))
zip_code = Optional[pl.String] # nullable field
class EnrichedSchema(OrderSchema): # inherits all parent fields
revenue = pl.Float64
No live DataFrame needed. Subclasses inherit parent fields. Supports complex
nested types. The schema class is a regular Python class: go-to-definition and
class-level navigation work in your IDE.
**For data pipelines, Option B is preferred.** Schemas are defined once,
shared across modules, visible in version control, and discoverable by your
IDE. Option A is convenient for exploration or when adding dfguard to existing
code you do not want to change.
See the :doc:`quickstart` for the full walkthrough.
.. toctree::
:maxdepth: 1
:caption: User Guide
self
quickstart
types
pipelines
airflow
kedro
.. toctree::
:maxdepth: 1
:caption: API Reference
api/schemas
api/enforcement
api/dataset
api/history
api/exceptions