Data Quality with Hamilton and Pandera
A post to highlight the lightweight options and customizations Hamilton provides with respect to data quality; one of them being Pandera
If you’ve ever built any pipelines / workflows that deal with data and had to manage them, you’ve surely encountered “data quality” issues.
In general the situation occurs because data has changed from your initial expectations. In machine learning terms this can be called data drift or concept drift. For example you might have an expectation that only a certain set of values are allowed to be found in a certain column, but one day they’re all empty! Or that a new value crept in and your code can’t handle it, or maybe that the distribution of the data has changed which breaks some assumptions of your machine learning model…
For most, mitigating these challenges means having to manage and integrate separate systems (e.g. great expectations, monte carlo, etc.) that can check data against some set of expectations the user has provided. In addition, for most of these solutions they also only focus on data at rest, e.g. data in a SQL accessible database, and don’t cater to the other types of objects that pipelines create, e.g. dataframes, models, your custom objects etc.
In this post I’ll walk you through the tools that Hamilton provides, out of the box, that will cover the majority of your data quality use cases.
We’ll structure the rest of this post as follows:
A brief refresher of Hamilton.
Shift Left: the Hamilton approach to data quality and how it works.
Hamilton’s Pandera integration
Future work
What is Hamilton?
Hamilton is a standardized way of building data pipelines in Python. The core concepts are simple – you write each data transformation step as a single Python function with the following rules:
The name of the function corresponds to the output variable it computes.
The parameter names (and types) correspond to inputs. These can be either passed-in parameters or names of other upstream functions.
This approach allows you to represent assets in ways that correspond closely to code, naturally self-documenting, and portable across infrastructure.
After writing your functions, you call a driver to execute them – in most cases this is a simple import/run (specifying the assets you want computed and letting the framework do the rest), but it provides options to customize execution:
Hamilton powers a wide range of use cases including feature engineering, ML training/inference pipelines (relevant to this post), RAG pipelines, and more. For a quick 101 interactive in the browser session you can browse tryhamilton.dev
Shift Left: Hamilton’s Approach to Data Quality
Hamilton takes the concept of “Shift Left” to the extreme. “Shift Left” is an idea that by moving or tackling tasks earlier in a process' timeline (i.e. “further left”), in the case of Hamilton right there with your code, you can significantly increase efficiency because you can catch things earlier. The net result is that you can catch and react quicker, resulting in a better prepared organization.
@check_output & @check_output_custom
check_output and check_output_custom are two decorators that you can annotate your Hamilton functions with. What they do, as hopefully their names suggest, allow you to set expectations on the output of a function. We classify them as “shift left” to the extreme, since they sit right alongside your logic! You can’t get closer than that!
Let’s introduce them by walking through an example, and then talk about execution and getting results.
@check_output
Let’s say we have the following two functions:
import pandas as pd
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3, min_periods=1).mean()
def acquisition_cost(avg_3wk_spend: pd.Series,
signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to a rolling average of spend."""
return avg_3wk_spend / signups
Now, let's say we want to assure a few things about acquisition_cost
:
That it consists of
float
s (should be obvious from the code, but we want to be sure)That it is greater than
0
(highly unlikely that a customer pays you for advertising)That it is less than `$1000` (anything this high likely means a data issue)
Furthermore, let's say we want the pipeline to log a warning (as opposed to just failing out) if any of the above conditions aren't met.
This is easy with the check_output
decorator!
# my_functions.py
import pandas as pd
import numpy as np
from hamilton.function_modifiers import check_output
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3, min_periods=1).mean()
@check_output(
range=(0,1000),
data_type=np.float64,
importance="warn"
)
def acquisition_cost(avg_3wk_spend: pd.Series,
signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to a rolling average of spend."""
return avg_3wk_spend / signups
It takes in a series of arguments (for list of available ones see this source code — sorry we’ll add these to the docs soon!), and as indicated by their names, perform a validation on the output; it’s easy to check for types, ranges, and values. The actual implementation depends on the output type of the function. For those that are looking for dataframe validations, see the Pandera section below.
Yes, it’s that straightforward: decorate function, specify checks & their values. To see more examples of this approach see this example.
Note: if you’re doing LLMs and want to always ensure a key word is present in the response, use the `contains=` key word argument with a function signature that returns a string type.
A note on importance levels
Currently there are two available importance levels:
"warn"
"fail"
They do exactly as you'd expect. "warn" logs the failure to the terminal and continues on. "fail" raises an exception in the final node.
@check_output_custom
As the name suggests, this provides you a way to customize validation completely. To add a custom validator, you need to implement the DataValidator class, and then provide it to the decorator. You can then use the @check_output_custom
decorator to run it on a function. For example:
from hamilton.function_modifiers import check_output_custom
@check_output_custom(AllEvenValidator(...), )
def even_number_generator(num_ints: int) -> list[int]:
...
Your custom AllEvenValidator can be defined as:
from hamilton.data_quality.base import DataValidator, ValidationResult
class AllEvenValidator(DataValidator):
"""Checks that numbers are even"""
# add any values you need here
def __init__(self, importance: str = "fail"):
super(AllEvenValidator, self).__init__(importance=importance)
def applies_to(self, datatype: Type[Type]) -> bool:
"""Whether or not this data validator can apply to
the specified dataset
:param datatype:
:return: True if it can be run on the specified type.
"""
return datatype == list[int] or datatype == pd.Series
def description(self) -> str:
"""Gives a description of this validator.
:return: The description of the validator as a string
"""
return "checks whether all numbers are even"
@classmethod
def name(cls) -> str:
"""Returns the name for this validator."""
return "EvenNumberValidator"
def validate(self,
dataset: Union[list[int], pd.Series]
) -> ValidationResult:
"""Actually performs the validation. Note when you
:param dataset: dataset to validate
:return: The result of validation
"""
valid = all([d % 2 == 0 for d in dataset])
return ValidationResult(
passes=valid,
message="... your message ...",
diagnostics={"length": len(dataset)}
)
This gives you a lot of flexibility in what actually happens after a function is run.
You can use this to write your own validators that:
Go to a database and validate the results by running a SQL query.
Ping an external service to get the validations you want to run.
etc.
If you happen to build something here that you’d like to contribute back to the community, open up a pull request!
Running your Pipelines and Examining the Results
Let's run the first pipeline above to get the result of acquisition_cost
, injecting some bad data along the way...
# run.py
import logging
import sys
import pandas as pd
from hamilton import driver
# code defined above
import my_functions
logging.basicConfig(stream=sys.stdout)
inputs = {
'signups': pd.Series([1, 10, 50, 100, 200, 400]),
'spend': pd.Series([10, 10, 20, 40, 40, 50]),
}
dr = driver.Builder().with_modules(my_functions).build()
df = dr.execute(['acquisition_cost'], inputs=inputs)
print(df.to_string())
When we run this, we get the following:
python run.py
WARNING:hamilton.data_quality.base:[acquisition_cost:range_validator]
validator failed. Message was: Series contains 5 values in range (0,1000), and 1 outside.. Diagnostic information is:
{'range': (0, 1000), 'in_range': 5, 'out_range': 1, 'data_size': 6}.
acquisition_cost
0 10.000000
1 5000.500000
2 666.866667
3 333.533333
4 0.166667
5 0.483333
Note that it completed successfully, yet printed out the warning. When we set it to “fail”
, we get an error (as expected). Let's go back to look at the results, and see what we can learn...
If we modify our script to return the data quality results as well, we can capture the results for later use!
import logging
import sys
import dataclasses
import pprint
import pandas as pd
from hamilton import driver
import my_functions # defined above
logging.basicConfig(stream=sys.stdout)
inputs = {
'signups': pd.Series([1, 10, 50, 100, 200, 400]),
'spend': pd.Series([10, 100000, 20, 40, 40, 500]),
}
dr = driver.Builder().with_modules(my_functions).build()
# find all DQ results we can grab
all_validator_variables = [
var.name for var in dr.list_available_variables() if
var.tags.get('hamilton.data_quality.contains_dq_results')]
data = dr.execute(['acquisition_cost'] + all_validator_variables)
# print results
pprint.pprint(
dataclasses.asdict(
data['acquisition_cost_range_validator']))
pprint.pprint(
dataclasses.asdict(
data['acquisition_cost_data_type_validator']))
Note there's some magic above -- we're working on improving querying and reporting. As always, if you have ideas for the API, let us know. Running the above yields the following:
WARNING:hamilton.data_quality.base:[acquisition_cost:range_validator] validator failed. Message was: Series contains 5 values in range (0,1000), and 1 outside.. Diagnostic information is: {'range': (0, 1000), 'in_range': 5, 'out_range': 1, 'data_size': 6}.
{'diagnostics': {'data_size': 6,
'in_range': 5,
'out_range': 1,
'range': (0, 1000)},
'message': 'Series contains 5 values in range (0,1000), and 1 outside.',
'passes': False}
{'diagnostics': {'actual_dtype': dtype('float64'),
'required_dtype': <class 'numpy.float64'>},
'message': "Requires subclass of datatype: <class 'numpy.float64'>. Got "
'datatype: float64. This is a match.',
'passes': True}
If you’re using the Hamilton UI - this information will be automatically surfaced to you.
Hamilton + Pandera
pandera
is an open source project that provides a flexible and expressive API for performing data validation on dataframe-like objects to make data processing pipelines more readable and robust. It contains more out of the box functionality than the simple data validators that Hamilton comes with.
Note that you have to have Hamilton installed with the pandera
extension. E.G.
pip install "sf-hamilton[pandera]"
The integration point is simple. All you have to do is provide a pandera schema using the default data validator with argument schema=
. This will validate the output against a schema provided by you.
If you don't know what a pandera schema is or haven't worked with them before, read more about it here. The integration works with schemas for both series and dataframes, across a multitude of dataframe types. E.g. polars, pandas, pyspark, etc.
Validating DataFrames
Pandera allows you specify a lot of different checks — even more than what Hamilton comes with. This is our recommended approach for validating dataframes if you can add the Pandera dependency. These can by custom or straightforward, for example:
import pandera as pa
import pandas as pd
from hamilton.function_modifiers import check_output
@check_output(
schema=pa.DataFrameSchema(
{
'column1': pa.Column(int),
'column2': pa.Column(float, pa.Check(lambda s: s < -1.2)),
# you can provide a list of validators
'column3': pa.Column(str, [
pa.Check(lambda s: s.str.startswith('value')),
pa.Check(lambda s: s.str.split('_', expand=True).shape[1] == 2)
]),
},
index=pa.Index(int),
strict=True,
),
importance="fail"
)
def dataframe_with_schema(...) -> pd.DataFrame:
...
When this function is run, Hamilton will invoke pandera for you with the provided schema and output a result.
Validating Series
Much like for dataframes, Pandera also enables one to write checks against “series/column” types:
import pandera as pa
import pandas as pd
from hamilton.function_modifiers import check_output
@check_output(
schema=pa.SeriesSchema(
str,
checks=[
pa.Check(lambda s: s.str.startswith('foo')),
pa.Check(lambda s: s.str.endswith('bar')),
pa.Check(lambda x: len(x) > 3, element_wise=True)
],
nullable=False,
),
importance="fail"
)
def series_with_schema(...) -> pd.Series:
...
You can find Pandera examples here. When execution runs, you’ll get Pandera’s nicely formatted output.
Handling the results more broadly
We utilize tags to index nodes that represent data quality. All data-quality related tags start with the prefix hamilton.data_quality
. Currently there are two:
hamilton.data_quality.contains_dq_results
-- this is a boolean that tells whether a node outputs a data quality results. These are nodes that get injected when a node is decorated, and can be queried by the end user.hamilton.data_quality.source_node
-- this contains the name of the source_node the data to which the data quality points.
Note that these tags will not be present if the node is not related to data quality -- don't assume they're in every node.
dr = driver.Builder() ... build()
all_validator_variables = [
var.name for var in dr.list_available_variables() if
var.tags.get('hamilton.data_quality.contains_dq_results')]
To query one can simply filter for all the nodes that contain these tags and access the results, as the above code snippet shows!
Future Work:
There’s lots of additions that we could add and include out of the box with Hamilton.
Pydantic support. Much like Pandera, Pydantic is often used to validate an object. We should support a similar mechanism to Pandera. This will be useful when you use Hamilton in web-services contexts for example.
Currently the actions that are taken when a validation fails are hardcoded. In the future, we will be considering adding special actions for each importance level that one can customize.
One can only disable data quality checks by commenting out the decorator. We intend to allow node-specific overrides. This would likely be combined with an lifecycle adapter.
Building out more off-the-shelf integrations with popular tools. E.g. if you're using Great Expectations, or some other vendor, integration a custom validator to ping that service is entirely possible.
Bootstrapping values. It’s often a chore to create checks. With Hamilton we could include a “profile” step that could produce the validations/assertions one wants to add to the code. Please come chat in slack if you’re interested in this.
We want to hear from you!
If you’re excited by any of this, or have strong opinions, drop by our Slack channel / leave some comments here! Some resources to help you get started:
📣 join our Hamilton community on Slack — need help with Hamilton? Ask here.
📝 leave us an issue if you find something
⭐ give us a star on the Hamilton repository if you like what we’re working on!
We recently launched Burr to create LLM agents and applications.