Using dfguard in Kedro

Kedro is a pipeline framework that structures data projects into nodes, pipelines, and a data catalog. A node is a pure Python function; Kedro wires inputs and outputs through the catalog.

dfguard fits naturally into Kedro. Schemas live in the node files alongside the functions that use them. dfg.arm() in settings.py arms the entire package from one place, so node functions need no @dfg.enforce decorator.

The working example lives at examples/kedro/ in the dfguard repository.

File structure

kedro/
├── requirements.txt
├── pyproject.toml
├── conf/base/
│   └── catalog.yml
├── data/
│   └── raw_orders.csv
└── src/orders_pipeline/
    ├── __init__.py
    ├── pipeline_registry.py
    ├── schemas.py                        # shared SparkSchema definitions
    └── pipelines/processing/
        ├── __init__.py
        ├── nodes.py                      # node functions, enforced by dfg.arm()
        └── pipeline.py                   # pipeline assembly

schemas.py

Define schema contracts in one file and import them wherever needed. This is the single source of truth for what each stage of the pipeline produces and consumes.

# src/orders_pipeline/schemas.py
import dfguard.pyspark as dfg
from pyspark.sql import types as T

class RawOrderSchema(dfg.SparkSchema):
    order_id:    T.LongType()
    customer_id: T.LongType()
    amount:      T.DoubleType()
    quantity:    T.IntegerType()
    status:      T.StringType()

class EnrichedOrderSchema(RawOrderSchema):
    revenue:       T.DoubleType()
    is_high_value: T.BooleanType()

nodes.py

Each node is a plain Python function with a schema-annotated argument. No @dfg.enforce decorator needed; dfg.arm() in settings.py handles that for the whole package. No validation logic inside the function body.

# src/orders_pipeline/pipelines/processing/nodes.py
from pyspark.sql import functions as F
from orders_pipeline.schemas import EnrichedOrderSchema, RawOrderSchema

# No @dfg.enforce needed; dfg.arm() in settings.py covers the whole package.
# The type annotations on the arguments are the contract.

def enrich_orders(raw: RawOrderSchema):
    return (
        raw
        .withColumn("revenue", F.col("amount") * F.col("quantity"))
        .withColumn("is_high_value", F.col("revenue") > 500.0)
    )

def summarise_by_customer(enriched: EnrichedOrderSchema):
    return (
        enriched
        .groupBy("customer_id")
        .agg(
            F.sum("revenue").alias("total_revenue"),
            F.count("*").alias("order_count"),
        )
    )

pipeline.py

# src/orders_pipeline/pipelines/processing/pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import enrich_orders, summarise_by_customer

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([
        node(
            func=enrich_orders,
            inputs="raw_orders",
            outputs="enriched_orders",
            name="enrich_orders_node",
        ),
        node(
            func=summarise_by_customer,
            inputs="enriched_orders",
            outputs="customer_summary",
            name="summarise_node",
        ),
    ])

catalog.yml

# conf/base/catalog.yml
raw_orders:
  type: spark.SparkDataset
  filepath: data/raw_orders.csv
  file_format: csv
  load_args:
    header: true
    inferSchema: true

enriched_orders:
  type: spark.SparkDataset
  filepath: data/enriched_orders.parquet
  file_format: parquet

customer_summary:
  type: spark.SparkDataset
  filepath: data/customer_summary.parquet
  file_format: parquet

Running the pipeline

cd examples/kedro
pip install -r requirements.txt
pip install -e src/
kedro run

# Run a specific pipeline
kedro run --pipeline processing

# Run a single node
kedro run --node enrich_orders_node

What a schema error looks like

If the catalog wires raw_orders directly into summarise_by_customer (missing the enrich step), Kedro propagates the node failure with the full dfguard message:

TypeError: Schema mismatch in summarise_by_customer() argument 'enriched':
  expected: order_id:bigint, customer_id:bigint, amount:double,
            quantity:int, status:string, revenue:double, is_high_value:boolean
  received: order_id:bigint, customer_id:bigint, amount:double,
            quantity:int, status:string

If the CSV source adds an unexpected column or changes a type, the error appears at the first node that touches that column, not deep in a shuffle or aggregation.

Where to define schemas

Put them in schemas.py at the package root. Node files import from there. This keeps schema definitions separate from transformation logic and gives you one place to update when the upstream contract changes.

For very large projects with many pipelines, a schemas/ directory with one file per domain works well:

src/orders_pipeline/schemas/
├── __init__.py   # re-exports all schema classes
├── orders.py
├── customers.py
└── products.py

Using @dfg.enforce instead of dfg.arm()

If you prefer explicit decoration over package-wide arming, add @dfg.enforce to individual node functions instead of calling dfg.arm() in settings.py. Both approaches work; the choice is style:

# src/orders_pipeline/hooks.py
import dfguard.pyspark as dfg
from kedro.framework.hooks import hook_impl

class FrameguardHook:
    @hook_impl
    def before_pipeline_run(self, run_params, pipeline, catalog):
        dfg.arm(package="orders_pipeline.pipelines")
# src/orders_pipeline/pipelines/processing/nodes.py  (with explicit decoration)
import dfguard.pyspark as dfg
from pyspark.sql import functions as F
from orders_pipeline.schemas import EnrichedOrderSchema, RawOrderSchema

@dfg.enforce
def enrich_orders(raw: RawOrderSchema):
    return raw.withColumn("revenue", F.col("amount") * F.col("quantity"))

@dfg.enforce
def summarise_by_customer(enriched: EnrichedOrderSchema):
    return enriched.groupBy("customer_id").agg(
        F.sum("revenue").alias("total_revenue"),
        F.count("*").alias("order_count"),
    )

dfg.arm() in settings.py: one call, whole package covered, no decorator on each function.

@dfg.enforce per function: explicit, visible at the function, easier to reason about in isolation.