Skip to content

Validate data with pandera

It`s highly recommended to add validation in the transformer call. This is useful to make sure the data is in the expected format, and to provide early fail if there is any unexpected change in the sources.

In the previous examples, we did not define schemas for the validation, so the validation was never executed. We can create some pandera models to validate the data:

models.py
import pandera as pa


class BeforeSchema(pa.DataFrameModel):
    client: str
    policy_start_date: pa.DateTime


class AfterSchema(pa.DataFrameModel):
    client: str
    policy_start_date: pa.DateTime
    days_since_start: int

And use pydantic + pandera to validate the data in the transformer:

elt.py
import pandas as pd
from models import AfterSchema, BeforeSchema
from pandera.typing import DataFrame
from pydantic import validate_call
from sqlalchemy import create_engine

from extralo import ETL, CSVSource, SQLDestination

engine = create_engine("sqlite:///data.sqlite")


@validate_call(validate_return=True)
def my_transformer(data: DataFrame[BeforeSchema]) -> dict[str, DataFrame[AfterSchema]]:
    data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
    data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
    return {"data": data}


etl = ETL(
    sources={"data": CSVSource("data.csv")},
    transformer=my_transformer,
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

If we run the ETL pipeline now, we will get an error because the policy_start_date column is not in the expected format.

$ python etl.py
Traceback (most recent call last):
...
pydantic_core._pydantic_core.ValidationError: 1 validation error for my_transformer
data
  Value error, expected series 'policy_start_date' to have type datetime64[ns], got object [type=value_error, input_value=    client policy_start_d...   Eve        2024-05-05, input_type=DataFrame]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error
Lets fix it and try again:

models.py
import pandera as pa


class BeforeSchema(pa.DataFrameModel):
    client: str
    policy_start_date: str


class AfterSchema(pa.DataFrameModel):
    client: str
    policy_start_date: pa.DateTime
    days_since_start: int
$ python etl.py

Now the ETL pipeline should run without errors.