Hamilton supports OpenLineage
Hamilton is very extensible - here we explain how Hamilton is now an OpenLineage Producer and can help you wrangle your data lineage efforts.
You don’t know you need it, until you need it.
When data teams are small, it’s simple enough to ask the person next to you, “hey what produced this table prod.customer_orders”, or “what data created this particular predictive churn model?” But as teams grow, and more data is produced, keeping track of how it all relates becomes a challenge.
In this post, we’ll go over how you can track all the data that you produce with Hamilton & OpenLineage. Specifically we will cover what Hamilton & OpenLineage are, and how Hamilton’s new OpenLineage adapter integrates with the OpenLineage ecosystem.
What is Hamilton?
Hamilton is a standardized way of building dataflows, a.k.a. “pipelines”, in Python. The core concepts are simple – you write each data transformation step as a single Python function with the following rules:
The name of the function corresponds to the output variable it computes.
The parameter names (and types) correspond to inputs. These can be either passed-in parameters or names of other upstream functions.
This approach allows you to represent assets in ways that correspond closely to code, naturally self-documenting, and portable across infrastructure.
After writing your functions, you call a driver to execute them – in most cases this is a simple import/run (specifying the assets you want computed and letting the framework do the rest), but it provides options to customize execution:
Hamilton powers a wide range of use cases including feature engineering, ML training/inference pipelines (relevant to this post), RAG pipelines, and more.
Visit tryhamilton.dev for a quick interactive introduction in the browser.
What is OpenLineage?
OpenLineage is a standard for tracking the flow of data (e.g. data lineage) through different tools and systems in an organization. It is a form of observability.
If this is new to you, a shipping tracking system is a good analogy. Imagine you order a product online and you want to know where it is at any moment — from the warehouse to the delivery truck to your doorstep. OpenLineage does something similar, it tracks data as it moves through various stages of processing and transformation within an organization. By having this tracking in place, teams can understand where the data came from, what happened to it, and where it ended up. This helps with:
troubleshooting issues
ensuring data quality
complying with regulations
and more
It becomes easy to understand the data’s journey, like knowing exactly where your package is and when it will arrive.
The OpenLineage standard
OpenLineage is a standard, meaning it is supported by multiple tools, allowing you to have precise lineage as data move through different components of your system (ETL, machine learning training and inference, predictions, etc.). The standard is implemented as a JSON schema that describe “data events” (read more).
The OpenLineage ecosystem is composed of producers, which are tools that transform data and generate OpenLineage events (e.g., Hamilton, Airflow, dbt, Dagster, Snowflake, Great Expectations, etc.), and consumers which are platforms to observe and catalog data transformations across producers (Amundsen, Marquez, OpenMetadata, etc.).
Who needs lineage?
Everyone wants lineage, but nobody wants to implement it. In sectors where auditing and governance are requirements, OpenLineage is critical to understand data impacts and comply with regulation. However, even when it’s not critical, OpenLineage is can be an enormous boost for productivity because it allows you to monitor what data within your organization is used and plan changes that won’t break someone else’s pipeline. Because lineage is hard to implement, teams avoid extra effort to integrate it, but Hamilton makes it easy for you! In fact, you get a lot of “lineage” with Hamilton, even more than what’s specified in the OpenLineage specification: you get logical lineage and provenance with Hamilton, and then observability with a single line if you’re using the HamiltonTracker with the Hamilton UI. In this post we’re leveraging this to make it easy to emit OpenLineage events.
How Hamilton produces OpenLineage events
To work, OpenLineage needs to know about what data was loaded & saved. When writing your Hamilton code, can generate the required metadata by using the materializer feature. There are several ways to use it:
Use the @dataloader and @datasaver decorators to load/save your data and produce metadata (see example below).
Materializer decorators with
.execute()
Materializer classes with either .with_materializers() or .materialize().
Below we show the first approach (1):
import pickle
from typing import Tuple
import pandas as pd
from hamilton.function_modifiers import dataloader, datasaver
from hamilton.io import utils
@dataloader()
def user_dataset(file_ds_path: str) -> Tuple[pd.DataFrame, dict]:
df = pd.read_csv(file_ds_path)
return df, utils.get_file_and_dataframe_metadata(file_ds_path, df)
...
@datasaver()
def saved_file(fit_model: ModelObject, file_path: str) -> dict:
with open(file_path, "wb") as f:
pickle.dump(fit_model, f)
return utils.get_file_metadata(file_path)
Importantly, you don’t need to import anything related to OpenLineage when defining your data transformations. Your code remains platform agnostic, as shown in the above example.
To integrate with OpenLineage, we use Hamilton’s very extensible lifecycle API system. The lifecycle API allows you to quickly write custom “adapters” that can observe things before and after graph & node execution. The OpenLineage adapter will produce lineage events as you execute your dataflow. To enable this feature, simply add the adapter to your Hamilton Driver:
import my_dataflow
from openlineage.client import OpenLineageClient
# write to HTTP, e.g. marquez; there are others like Kafka, File, etc.
client = OpenLineageClient(url="http://localhost:5000")
# create the adapter
adapter = OpenLineageAdapter(client, "my_namespace", "my_job_name")
# add to Hamilton
# import your pipeline code
dr = (
driver.Builder()
.with_modules(my_dataflow)
.with_adapters(adapter) # <-- add the adapter here
.build()
)
# execute as normal -- and openlineage events will be emitted
dr.execute(...)
In terms of OpenLineage specifics, currently Hamilton emits the following facets:
SourceCodeJobFacet
- with the entire source code of your Hamilton dataflow.JobTypeJobFacet
- specifying a BATCH job.A custom
HamiltonFacet
- that specifies the parameterization of the Hamilton dataflow.A
StorageDatasetFacet
&DatasourceDatasetFacet
- for files loaded & savedA
SQLJobFacet
- if SQL is detected as being used.A
ErrorMessageRunFacet
- if your Hamilton dataflow execution fails.
If you need more facets, please open up an issue / open up a PR to contribute!
Example code
If you’re after some sample code to get started, we’ve got an example here to help you get started.
Use case: Boost Airflow pipeline observability with Hamilton
If you’re using Airflow’s Python & Kubernetes operator — you can’t emit lineage events from arbitrary Python code. But if it’s running Hamilton(!), you can now easily make sure that all your data, ML, and AI workflows provide lineage and visibility over what’s happening.
A sketch of the code would be the following:
Put Hamilton within the Python / Kubernetes task.
Add the OpenLineage adapter when constructing the Driver.
Execute as normal — and gain OpenLineage insights everywhere!
@task
def my_python_task(some_argument):
"""This is a function that will run within Airflow"""
from openlineage.client import OpenLineageClient
# write to HTTP, e.g. marquez
client = OpenLineageClient(url="http://localhost:5000")
# create the adapter
adapter = OpenLineageAdapter(client, "my_namespace", "my_job_name")
# add to Hamilton
# import your pipeline code
dr = (driver
.Builder()
.with_modules(YOUR_MODULES)
.with_adapters(adapter) # <-- add the adapter here
.build()
)
# execute as normal -- and openlineage events will be emitted
dr.execute(...)
Does the Hamilton UI consume OpenLineage events?
Not yet; an integration is on the roadmap.
Summary
This post introduced Hamilton, OpenLineage, and how they can power your data lineage efforts. Hamilton helps fill observability and lineage gaps in Python code via its graph structure and OpenLineage event emission.
Hamilton is now an OpenLineage producer, and the Hamilton UI should become an OpenLineage consumer soon! This will provide the single pane of glass connecting data, ML, and AI observability.
We Want to Hear from You
If you’re excited by any of this, or have strong opinions, drop by our Slack channel / or leave some comments here! Some resources to get you help:
📣 join our community on Slack — we’re more than happy to help answer questions you might have or get you started.
⭐️ Hamilton on GitHub
📝 leave us an issue if you find something
Other Hamilton posts you might be interested in: