Skip to content

Basic usage

ExTraLo allows you to explicitly define the ETL pipeline. This is the simplest way to use ExTraLo. In this example, we will extract data from a csv file, transform it and load it to a SQLite database.

First, we define the transformation that will be applied to the data. The way we define this transformation is by creating a class that exposes a transform method. This method can receive named arguments with the same name as the name define in the sources definition.

In this case, we will calculate the number of days since the policy start date:

transformer.py
import pandas as pd


def my_transformer(data):
    data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
    data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
    return {"data": data}

Notice how we defined the argument to my_transform function with the name data. This name must be the same name used in the sources definition in the next step. Also, notice how we returned a dict of DataFrame. This is required since we could return multiple data from this step.

Lets define a SQLite database engine to use as destination:

etl.py
from sqlalchemy import create_engine
from transformer import my_transformer

from extralo import ETL, CSVSource, SQLDestination

engine = create_engine("sqlite:///data.sqlite")

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    transformer=my_transformer,
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

Now we can define the ETL pipeline. We can import the transformer we created earlier to transform the data:

etl.py
from sqlalchemy import create_engine
from transformer import my_transformer

from extralo import ETL, CSVSource, SQLDestination

engine = create_engine("sqlite:///data.sqlite")

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    transformer=my_transformer,
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

And finally run it:

$ python etl.py
Transformer output type hints are not a TypedDict, validation will be done only at runtime.

And our data is extracted from the csv file, transformed and loaded to the SQLite database. We will talk about that output message in the future.

We can query the SQLite database to see the data loaded:

query.py
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("sqlite:///data.sqlite")

print(pd.read_sql("data_group", engine))
$ python query.py
    client policy_start_date  days_since_start
0    Alice        2024-01-01               216
1      Bob        2024-02-02               184
2  Charlie        2024-03-03               154
3    David        2024-04-04               122
4      Eve        2024-05-05                91

This is the simpleste example, but ExTraLo can do more. In the next sections, you will see how to add logging and validation to the process, and also how to extract from multiple source and load to multiple destinations.