Data Pipeline Usage

dfguard works differently depending on your pipeline framework, because Kedro and Airflow handle DataFrames differently. This page explains what to reach for in each case.

Quick reference

  • Enforce one function: @dfg.enforce

  • Enforce an entire package from one place: dfg.arm() in __init__.py (Kedro: see Using dfguard in Kedro)

  • Validate a DataFrame right after loading: MySchema.assert_valid(df)

  • Turn off enforcement in tests or CI: dfg.disarm()

One thing to know before you start

In Kedro, DataFrames come from the catalog at runtime. You cannot call dfg.schema_of(raw_df) at module level because raw_df does not exist when the module is imported. Use dfg.SparkSchema for all Kedro nodes.

dfg.schema_of(df) is still useful in scripts and notebooks where you have a live DataFrame in hand.


Kedro

Kedro is a pipeline framework that structures projects into nodes, pipelines, and a data catalog. See Using dfguard in Kedro for a full working example with runnable code.

The recommended structure is: dfg.SparkSchema for every node schema, defined alongside the functions that use them. dfg.arm() called once in settings.py wraps every annotated function in the package automatically, so node functions need no @dfg.enforce decorator.

Define schemas and nodes

Schemas live alongside the functions that use them.

# my_pipeline/nodes/raw_nodes.py  (enforced by dfg.arm() in settings.py)
import dfguard.pyspark as dfg
from pyspark.sql import functions as F, types as T


class RawOrderSchema(dfg.SparkSchema):
    order_id:    T.LongType()
    amount:      T.DoubleType()
    quantity:    T.IntegerType()
    customer_id: T.LongType()


def validate_orders(df: RawOrderSchema):
    return df.filter(F.col("amount") > 0)


def deduplicate(df: RawOrderSchema):
    return df.dropDuplicates(["order_id"])
# my_pipeline/nodes/enrichment_nodes.py
import dfguard.pyspark as dfg
from pyspark.sql import functions as F, types as T
from my_pipeline.nodes.raw_nodes import RawOrderSchema


class EnrichedOrderSchema(RawOrderSchema):
    revenue: T.DoubleType()   # inherits all RawOrderSchema fields


def enrich(df: RawOrderSchema):
    return df.withColumn("revenue", F.col("amount") * F.col("quantity"))


def flag_high_value(df: EnrichedOrderSchema):
    return df.withColumn("is_vip", F.col("revenue") > 1000)

Arm everything from settings.py

Kedro loads settings.py before any pipeline runs. One call there wraps every annotated function in the package. Node files keep their schema definitions and type annotations; those are the contract. What they do not need is the @dfg.enforce decorator on each function.

# my_pipeline/settings.py
import dfguard.pyspark as dfg

dfg.arm()

That single call walks every module in the package, finds every function with a schema annotation, and wraps it. Functions without schema annotations are left alone. Arguments that are not DataFrames (str, int, etc.) inside wrapped functions are also not touched.

Run with:

kedro run

What you get

The wrong DataFrame raises before any Spark work happens:

# raw_df has order_id, amount, quantity, customer_id
validate_orders(raw_df)       # OK

# raw_df is missing 'revenue', which EnrichedOrderSchema requires
flag_high_value(raw_df)
# TypeError: Schema mismatch in flag_high_value() argument 'df':
#   expected: order_id:bigint, amount:double, quantity:int, customer_id:bigint, revenue:double
#   received: order_id:bigint, amount:double, quantity:int, customer_id:bigint

# wrong_df has completely different columns
validate_orders(wrong_df)
# TypeError: Schema mismatch in validate_orders() argument 'df':
#   expected: order_id:bigint, amount:double, quantity:int, customer_id:bigint
#   received: user_id:bigint, name:string

Airflow

Apache Airflow tasks are Python callables. They receive paths or config, create a SparkSession inside, do the work, and write the result. DataFrames are never passed between tasks directly; only serializable values are. See Using dfguard in Airflow for a full working example.

This means @dfg.enforce on task functions doesn’t buy you much (the args are strings, not DataFrames). The useful tools are:

  • MySchema.assert_valid(df) right after loading, which catches schema drift from upstream the moment data enters your task, not halfway through processing

  • @dfg.enforce on helper functions that the task calls internally

Validate after loading

# my_dag.py
import dfguard.pyspark as dfg
from pyspark.sql import types as T


class RawOrderSchema(dfg.SparkSchema):
    order_id:    T.LongType()
    amount:      T.DoubleType()
    quantity:    T.IntegerType()
    customer_id: T.LongType()


class EnrichedOrderSchema(RawOrderSchema):
    revenue: T.DoubleType()


@dfg.enforce
def compute_revenue(df: RawOrderSchema):
    from pyspark.sql import functions as F
    return df.withColumn("revenue", F.col("amount") * F.col("quantity"))


@dfg.enforce
def flag_vip(df: EnrichedOrderSchema):
    from pyspark.sql import functions as F
    return df.withColumn("is_vip", F.col("revenue") > 1000)


def enrich_orders_task(input_path: str, output_path: str) -> str:
    from pyspark.sql import SparkSession
    spark = SparkSession.getOrCreate()

    df = spark.read.parquet(input_path)

    # Validate right after loading. If upstream changed the schema,
    # you find out here, not in a cryptic Spark error three steps later.
    RawOrderSchema.assert_valid(df)

    enriched = compute_revenue(df)   # @dfg.enforce checks df's schema here
    flagged  = flag_vip(enriched)    # @dfg.enforce checks enriched's schema

    flagged.write.mode("overwrite").parquet(output_path)
    return output_path

assert_valid raises SchemaValidationError with a clear message listing every missing or mismatched field.

Run with:

airflow dags trigger orders_pipeline

Or trigger via the Airflow web UI at http://localhost:8080.


Global control

Turn enforcement off

dfg.disarm() makes every @dfg.enforce wrapper and every function wrapped by dfg.arm() a straight pass-through. Nothing is checked.

import dfguard.pyspark as dfg
from pyspark.sql import SparkSession, types as T

spark = SparkSession.builder.getOrCreate()
wrong_df = spark.createDataFrame([(1, "Alice")], "user_id LONG, name STRING")

class OrderSchema(dfg.SparkSchema):
    order_id: T.LongType()
    amount:   T.DoubleType()

@dfg.enforce
def process(df: OrderSchema):
    return df

dfg.disarm()
process(wrong_df)        # no error, enforcement is off

dfg.arm()
process(wrong_df)        # raises TypeError again

The flag is global to the Python process. It affects everything, regardless of when or where the functions were imported.

Environment variable

A clean way to disable enforcement without touching code:

# somewhere at startup, e.g. conftest.py or settings.py
import os
import dfguard.pyspark as dfg

if os.getenv("DFGUARD_DISABLED"):
    dfg.disarm()

Then in CI or non-production environments:

DFGUARD_DISABLED=1 kedro run
DFGUARD_DISABLED=1 pytest

In pytest

If you want most tests to run without schema enforcement (useful when you’re testing logic, not schemas), add this to conftest.py:

import pytest
import dfguard.pyspark as dfg

@pytest.fixture(autouse=True)
def no_schema_enforcement():
    dfg.disarm()
    yield
    dfg.arm()

Tests that specifically test enforcement can call dfg.arm() at the start, or just remove the autouse and opt in explicitly.