Validate data with pandera
It`s highly recommended to add validation in the transformer call. This is useful to make sure the data is in the expected format, and to provide early fail if there is any unexpected change in the sources.
In the previous examples, we did not define schemas for the validation, so the validation was never executed. We can create some pandera models to validate the data:
models.py
import pandera as pa
class BeforeSchema(pa.DataFrameModel):
client: str
policy_start_date: pa.DateTime
class AfterSchema(pa.DataFrameModel):
client: str
policy_start_date: pa.DateTime
days_since_start: int
And use pydantic + pandera to validate the data in the transformer:
elt.py
import pandas as pd
from models import AfterSchema, BeforeSchema
from pandera.typing import DataFrame
from pydantic import validate_call
from sqlalchemy import create_engine
from extralo import ETL, CSVSource, SQLDestination
engine = create_engine("sqlite:///data.sqlite")
@validate_call(validate_return=True)
def my_transformer(data: DataFrame[BeforeSchema]) -> dict[str, DataFrame[AfterSchema]]:
data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
return {"data": data}
etl = ETL(
sources={"data": CSVSource("data.csv")},
transformer=my_transformer,
destinations={
"data": [
SQLDestination(engine, "data_group", None, if_exists="replace"),
],
},
)
etl.execute()
If we run the ETL pipeline now, we will get an error because the policy_start_date column is not in the expected format.
$ python etl.py
Traceback (most recent call last):
...
pydantic_core._pydantic_core.ValidationError: 1 validation error for my_transformer
data
Value error, expected series 'policy_start_date' to have type datetime64[ns], got object [type=value_error, input_value= client policy_start_d... Eve 2024-05-05, input_type=DataFrame]
For further information visit https://errors.pydantic.dev/2.11/v/value_error
models.py
import pandera as pa
class BeforeSchema(pa.DataFrameModel):
client: str
policy_start_date: str
class AfterSchema(pa.DataFrameModel):
client: str
policy_start_date: pa.DateTime
days_since_start: int
Now the ETL pipeline should run without errors.