Schemas, and Composable DataFrame ContractsIn this tutorial, we demonstrate how to build robust, production-grade data validation pipelines using with typed DataFrame models. We start by simulating realistic, imperfect transactional data and progressively enforce strict schema constraints, column-level rules, and cross-column business logic using declarative checks. We show how lazy validation helps us surface multiple data quality issues at once, how invalid records can be quarantined without breaking pipelines, and how schema enforcement can be applied directly at function boundaries to guarantee correctness as data flows through transformations. Check out the .
!pip -q install "pandera>=0.18" pandas numpy polars pyarrow hypothesis
import json
import numpy as np
import pandas as pd
import pandera as pa
from pandera.errors import SchemaError, SchemaErrors
from pandera.typing import Series, DataFrame
print("pandera version:", pa.__version__)
print("pandas version:", pd.__version__)
We set up the execution environment by installing Pandera and its dependencies and importing all required libraries. We confirm library versions to ensure reproducibility and compatibility. It establishes a clean foundation for enforcing typed data validation throughout the tutorial. Check out the .
rng = np.random.default_rng(42)
def make_raw_orders(n=250):
countries = np.array(["CA", "US", "MX"])
channels = np.array(["web", "mobile", "partner"])
raw = pd.DataFrame(
{
"order_id": rng.integers(1, 120, size=n),
"customer_id": rng.integers(1, 90, size=n),
"email": rng.choice(
["alice@example.com", "bob@example.com", "bad_email", None],
size=n,
p=[0.45, 0.45, 0.07, 0.03],
),
"country": rng.choice(countries, size=n, p=[0.5, 0.45, 0.05]),
"channel": rng.choice(channels, size=n, p=[0.55, 0.35, 0.10]),
"items": rng.integers(0, 8, size=n),
"unit_price": rng.normal(loc=35, scale=20, size=n),
"discount": rng.choice([0.0, 0.05, 0.10, 0.20, 0.50], size=n, p=[0.55, 0.15, 0.15, 0.12, 0.03]),
"ordered_at": pd.to_datetime("2025-01-01") + pd.to_timedelta(rng.integers(0, 120, size=n), unit="D"),
}
)
raw.loc[rng.choice(n, size=8, replace=False), "unit_price"] = -abs(raw["unit_price"].iloc[0])
raw.loc[rng.choice(n, size=6, replace=False), "items"] = 0
raw.loc[rng.choice(n, size=5, replace=False), "discount"] = 0.9
raw.loc[rng.choice(n, size=4, replace=False), "country"] = "ZZ"
raw.loc[rng.choice(n, size=3, replace=False), "channel"] = "unknown"
raw.loc[rng.choice(n, size=6, replace=False), "unit_price"] = raw["unit_price"].iloc[:6].round(2).astype(str).values
return raw
raw_orders = make_raw_orders(250)
display(raw_orders.head(10))
We generate a realistic transactional dataset that intentionally includes common data quality issues. We simulate invalid values, inconsistent types, and unexpected categories to reflect real-world ingestion scenarios. It allows us to meaningfully test and demonstrate the effectiveness of schema-based validation. Check out the .
EMAIL_RE = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}$"
class Orders(pa.DataFrameModel):
order_id: Series[int] = pa.Field(ge=1)
customer_id: Series[int] = pa.Field(ge=1)
email: Series[object] = pa.Field(nullable=True)
country: Series[str] = pa.Field(isin=["CA", "US", "MX"])
channel: Series[str] = pa.Field(isin=["web", "mobile", "partner"])
items: Series[int] = pa.Field(ge=1, le=50)
unit_price: Series[float] = pa.Field(gt=0)
discount: Series[float] = pa.Field(ge=0.0, le=0.8)
ordered_at: Series[pd.Timestamp]
class Config:
coerce = True
strict = True
ordered = False
@pa.check("email")
def email_valid(cls, s: pd.Series) -> pd.Series:
return s.isna() | s.astype(str).str.match(EMAIL_RE)
@pa.dataframe_check
def total_value_reasonable(cls, df: pd.DataFrame) -> pd.Series:
total = df["items"] * df["unit_price"] * (1.0 - df["discount"])
return total.between(0.01, 5000.0)
@pa.dataframe_check
def channel_country_rule(cls, df: pd.DataFrame) -> pd.Series:
ok = ~((df["channel"] == "partner") & (df["country"] == "MX"))
return ok
We define a strict Pandera DataFrameModel that captures both structural and business-level constraints. We apply column-level rules, regex-based validation, and dataframe-wide checks to declaratively encode domain logic. Check out the .
try:
validated = Orders.validate(raw_orders, lazy=True)
print(validated.dtypes)
except SchemaErrors as exc:
display(exc.failure_cases.head(25))
err_json = exc.failure_cases.to_dict(orient="records")
print(json.dumps(err_json[:5], indent=2, default=str))
We validate the raw dataset using lazy evaluation to surface multiple violations in a single pass. We inspect structured failure cases to understand exactly where and why the data breaks schema rules. It helps us debug data quality issues without interrupting the entire pipeline. Check out the .
def split_clean_quarantine(df: pd.DataFrame):
try:
clean = Orders.validate(df, lazy=False)
return clean, df.iloc[0:0].copy()
except SchemaError:
pass
try:
Orders.validate(df, lazy=True)
return df.copy(), df.iloc[0:0].copy()
except SchemaErrors as exc:
bad_idx = sorted(set(exc.failure_cases["index"].dropna().astype(int).tolist()))
quarantine = df.loc[bad_idx].copy()
clean = df.drop(index=bad_idx).copy()
return Orders.validate(clean, lazy=False), quarantine
clean_orders, quarantine_orders = split_clean_quarantine(raw_orders)
display(quarantine_orders.head(10))
display(clean_orders.head(10))
@pa.check_types
def enrich_orders(df: DataFrame[Orders]) -> DataFrame[Orders]:
out = df.copy()
out["unit_price"] = out["unit_price"].round(2)
out["discount"] = out["discount"].round(2)
return out
enriched = enrich_orders(clean_orders)
display(enriched.head(5))
We separate valid records from invalid ones by quarantining rows that fail schema checks. We then enforce schema guarantees at function boundaries to ensure only trusted data is transformed. This pattern enables safe data enrichment while preventing silent corruption. Check out the .
class EnrichedOrders(Orders):
total_value: Series[float] = pa.Field(gt=0)
class Config:
coerce = True
strict = True
@pa.dataframe_check
def totals_consistent(cls, df: pd.DataFrame) -> pd.Series:
total = df["items"] * df["unit_price"] * (1.0 - df["discount"])
return (df["total_value"] - total).abs() <= 1e-6
@pa.check_types
def add_totals(df: DataFrame[Orders]) -> DataFrame[EnrichedOrders]:
out = df.copy()
out["total_value"] = out["items"] * out["unit_price"] * (1.0 - out["discount"])
return EnrichedOrders.validate(out, lazy=False)
enriched2 = add_totals(clean_orders)
display(enriched2.head(5))
We extend the base schema with a derived column and validate cross-column consistency using composable schemas. We verify that computed values obey strict numerical invariants after transformation. It demonstrates how Pandera supports safe feature engineering with enforceable guarantees.
In conclusion, we established a disciplined approach to data validation that treats schemas as first-class contracts rather than optional safeguards. We demonstrated how schema composition enables us to safely extend datasets with derived features while preserving invariants, and how Pandera seamlessly integrates into real analytical and data-engineering workflows. Through this tutorial, we ensured that every transformation operates on trusted data, enabling us to build pipelines that are transparent, debuggable, and resilient in real-world environments.
Check out the . Also, feel free to follow us on and don’t forget to join our and Subscribe to . Wait! are you on telegram?
The post appeared first on .
