RAG: ingestion and chunking using Hamilton and scaling to Ray, Dask, or PySpark
In this post we show how one can write a document ingestion RAG pipeline locally and then with minimal changes, make it scale on to systems like Ray, Dask, or PySpark.
This post came about from a Hamilton user conversation with Mathew Goldsborough, the Founder and CEO of ThreadScribe.ai.
Quick RAG Recap
Retrieval Augmented Generation, or simply RAG, has two components that largely operate independently: ingestion and inference. Ingestion is required for inference to operate, because it populates a database/index which is then queried as part of the inference pipeline. For those unfamiliar with RAG, we invite users to see our reference post and example on a RAG architecture operating from a web service. In this post we present how you can use Hamilton to build a process/job/task that ingests documents for RAG purposes, and then scale it accordingly to what your production needs are.
Note: to keep this example and post simple, we will focus on just document ingestion and chunking. Extending the code to also compute embeddings and push to a vector store is a straightforward exercise with what we present here.
Document Ingestion is a DAG
The premise of retrieval augmented generation (RAG) is that it needs “documents” in a format that enables one to query to find them and then retrieve them. In an enterprise production setting, ingestion could mean processing 10,000+ documents for this purpose. Operationally, you’ll likely iterate and change things over time: e.g. changing the chunking strategy, size of a chunk (how much text do you embed at a single time), the embedding process, the vector store, etc. Then for every change/update/tweak potentially reprocessing your documents. Overtime, if you do this sub-optimally, it’ll result in code & pipelines that are hard to manage, debug, and that take a long time to run.
Thankfully, document processing pipelines are very amenable to being modeled as a directed acyclic graph (DAG). Which is exactly what Hamilton was built to model in a low abstraction manner; Hamilton is perfect for writing modular, testable, and self-documenting code. It was built back in 2019 at Stitch Fix to facilitate the production software development lifecycle without sacrificing development speed, and is used in many places in production (Stitch Fix, Tradewell Technologies, Coinbase, IBM, AWS, UK GDS, etc. to name a few).
At its core, Hamilton standardizes the way users write Python code by writing functions to express a dataflow or DAG. Each function defines a transformation and its parameters indicate its dependencies. Hamilton automatically connects individual functions into a Directed Acyclic Graph (DAG) that can be executed, visualized, optimized, and reported on:
For those new to Hamilton we point readers to the many articles on it (e.g. origin story, ML pipeline reference, Example RAG architecture, Production prompt engineering, etc.) as well as
https://www.tryhamilton.dev/. Otherwise we assume basic familiarity with Hamilton for the rest of this post.
Example: Scraping website documentation and processing that into chunks
To motivate the code in this post, we’ll pretend we’re building a chat bot application, i.e. a RAG system to ask questions of Hamilton’s Documentation. We want to be able to help users answer questions on how to use Hamilton based on the content available in the documentation. For those that want to see all the code for this post, it can be found here.
What we want the code to do
Let’s layout the processing pipeline we’d want:
Get a sitemap.xml – most docs have this for search engine optimization (SEO).
For each url from the sitemap.xml:
Download it
Extract the relevant HTML, given some context
Chunk the text, given a chunking strategy
[Embed the chunk]
[Send (chunks, embeddings, metadata) to vector database]
For this example we’ll skip steps (d) and (3) as mentioned earlier to simplify this example.
Some of you might be thinking that this isn’t a DAG, there’s a for-loop, how does Hamilton apply? That’s true at first glance, but really what the loop is doing, is a map operation over each document. So what’s in the loop is a DAG, and the for-loop is just a parameterization of that DAG, so still a DAG.
Let’s sketch out the DAG that processes a given URL. Naively it could be three lines that delegate to other functions/objects in a linear fashion:
html = requests.get(url)
article = extract_article(html)
chunked_text = chunk_article(article)
Other than a “hello world” example, your processing pipeline won’t look like this. Specifically let’s add some more realities into our example:
The content of interest is found in a particular element of the HTML. This logic could change depending on where we’re pulling from.
The chunking strategy should be pluggable and modular.
We want to store the text and metadata about it.
While we won’t get into it here, it's typical to see an object oriented approach to the above three lines. We think Hamilton is a better abstraction here, because you’re dealing with simpler constructs to maintain and follow – plain old python functions – which is how Hamilton standardizes things.
Some Hamilton Code
Let’s now see the Hamilton code that can chunk a given documentation URL. I’ll break it down into sections for readability.
Pulling and extracting the article using a regex (doc strings shortened on purpose):
def article_regex() -> str:
"""This assumes you're using the furo theme for sphinx"""
return r'<article role="main" id="furo-main-content">(.*?)</article>'
def article_text(url: str, article_regex: str) -> str:
"""Pulls URL and takes out relevant HTML."""
html = requests.get(url)
article = re.findall(article_regex, html.text, re.DOTALL)
if not article:
raise ValueError(f"No article found in {url}")
text = article[0].strip()
return text
Pluggable chunking strategy – we can easily test and swap logic by changing the inside of the functions:
def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
"""Return HTML chunker object."""
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
return text_splitter.HTMLHeaderTextSplitter(
headers_to_split_on=headers_to_split_on)
def text_chunker(
chunk_size: int = 256, chunk_overlap: int = 32
) -> text_splitter.RecursiveCharacterTextSplitter:
"""Returns the text chunker object."""
return text_splitter.RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
def chunked_text(
article_text: str,
html_chunker: text_splitter.HTMLHeaderTextSplitter,
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
) -> list[documents.Document]:
"""This function takes in HTML, chunks it, and then chunks it again."""
header_splits = html_chunker.split_text(article_text)
splits = text_chunker.split_documents(header_splits)
return splits
A summary type object to capture information we can then use for follow-on purposes, e.g. embedding, pushing to a vector store, etc.
def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
"""Function to aggregate what we produced.
Note: this function is where you could cache the results to a datastore, etc.
"""
return {"url": url, "article_text": article_text, "chunks": chunked_text}
This together then forms a DAG that looks like the following (image courtesy of Hamilton):
Handling the map/loop part
To complete the DAG we need to pull the sitemap and then loop over each URL, processing it. You could handle the parameterization of this DAG in a few ways. Here we’ll show you how to do so with Parallelizable and Collect constructs that Hamilton has (see our post last year on it):
A function with a return type annotation of Parallelizable[TYPE] indicates to Hamilton that we want to map over the output, which could be run in parallel. A function with a parameter annotated with Collect[TYPE], indicates to Hamilton that it should stop the map, and collect (i.e. reduce) the results. So any Hamilton code that is linked between a Parallelizable and Collect, will be run multiple times, each with a different value.
So the code (skipping what we showed above) would look like this using Parallelizable and Collect:
def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
"""Takes in a sitemap URL and returns the sitemap.xml file."""
sitemap = requests.get(sitemap_url)
return sitemap.text
def urls_from_sitemap(sitemap_text: str, api_key: Secret) -> list[str]:
"""Takes in a sitemap.xml file contents and creates a list of all the URLs"""
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
return urls
def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
"""Takes in a list of URLs for parallel processing."""
for url in urls_from_sitemap[0:max_urls]:
yield url
# The previous Hamilton code could live here, or if in another module, Hamilton
# would stitch the graph together correctly.
def collect_processed_articles(url_result: Collect[dict]) -> list:
"""Function to collect/reduce the results from parallel processing."""
return list(url_result)
Therefore the final DAG looks like this:
The green and red demarcate the parallel block, along with the crows feet on the arrows signifying multiplicity.
Running the code
Running the above code isn’t all too different from running regular Hamilton code. We just need to flip a flag on, and by default Hamilton will use Multithreading to parallelize any work between a Parallelizable and Collect.
from hamilton import driver
import doc_pipeline # the module where the above code lives
dr = (
driver.Builder()
.with_modules(doc_pipeline)
# you need to set this to tell Hamilton to use the parallel & collect;
.enable_dynamic_execution(allow_experimental_mode=True)
.build()
)
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
If you want to play with this yourself, you can get started with this example in three lines from the hub.
Now this works well for a single machine. But what if in production you have a lot of data to deal with, or perhaps you have a cluster at your disposal? Let’s see what you can do in the next section.
Scaling Our Pipeline to Production
When building a RAG system it's not uncommon to reprocess documents because of things changing (chunk size, embeddings changing, new documents, etc), or perhaps you are developing locally on a small subset, and then when you get to production you need to run on a whole lot more. If you are limited to a single machine this could become a bottleneck. What are you to do? You need to scale to something that can run on some sort of cluster. How do we change the above code to do that? For Ray & Dask, it’s effectively one line of code. For PySpark, it’s a few lines more. In any case, it's fairly straightforward. Why? Hamilton helps you to write your core dataflow, i.e. DAG, logic independent from the execution context in a standardized way, which makes it easy to swap where the code actually runs. Let me show you. Note: we will be showing excerpts from the code that can be found here.
Ray or Dask
Using either Ray or Dask is just a matter of switching one line really.
Ray:
To get our pipeline scaling onto Ray, we just need to import the Hamilton extension, start/connect to Ray, and then pass in the RayTaskExecutor. Nothing else needs to change.
from hamilton.plugins import h_ray
ray.init()
dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_remote_executor(
h_ray.RayTaskExecutor() # <--- this what you add
) # be sure to run ray.init() or pass in config.
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
...
ray.shutdown()
Dask:
To get our pipeline scaling onto Dask, we just need to import the Hamilton extension, start/connect to Dask, and then pass in the DaskExecutor. Nothing else needs to change.
from hamilton.plugins import h_dask
cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
remote_executor = h_dask.DaskExecutor(client=client)
dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_remote_executor( # this what you add below
h_dask.DaskExecutor(client=client)
)
.build()
)
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
...
client.shutdown()
Caveats
Both Ray & Dask have similar caveats with respect to a few items:
Serialization: things going into what’s being parallelized and coming back out of need to be serializable. This is normal for anything that requires objects to be passed between process boundaries.
Concurrency/parallelism: it is limited by the configuration/setup of the Ray or Dask “cluster” it is being connected to (both can run locally on a machine just fine).
Failures: you need to handle this yourself – but it’s easy with Hamilton and the way things are written to write robust unit tests for code that handles edge cases.
Memory: the “collect” (or reduce) step, currently brings outputs into memory from all the parallel tasks. If you run into out of memory errors, it means you need a bigger machine, or that you should probably store large objects in the parallel process and then just pass pointers as an output.
PySpark
Scaling our pipeline onto PySpark requires a few adjustments because we need to use spark dataframes, and they have type restrictions. To get our pipeline running on spark we therefore need to:
Get the data into a PySpark dataframe.
Adjust some of the functions because they are types that can’t be made into PySpark columns.
For (1) see below, for (2) we moved the “map” functions out into a separate module for more clarity in the code. Specifically the code below shows a PySpark pipeline that gets the data, creates a PySpark dataframe, adds some columns to, i.e. adds the text chunks to the dataframe.
def spark_session(app_name: str) -> ps.SparkSession:
return ps.SparkSession.builder.appName(app_name).getOrCreate()
def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
"""Takes in a sitemap URL and returns the sitemap.xml file.
:param sitemap_url: the URL of sitemap.xml file
:return:
"""
try:
sitemap = requests.get(sitemap_url)
except Exception as e:
raise RuntimeError(f"Failed to fetch sitemap from {sitemap_url}. Original error: {str(e)}")
return sitemap.text
def urls_from_sitemap(
sitemap_text: str, spark_session: ps.SparkSession, num_partitions: int = 4
) -> ps.DataFrame:
"""Takes in a sitemap.xml file contents and creates a df of all the URLs in the file.
:param sitemap_text: the contents of a sitemap.xml file
:return: df of URLs
"""
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
df = (
spark_session.createDataFrame(urls, "string")
.toDF("url")
.repartition(numPartitions=num_partitions)
)
return df
# with_columns makes some assumptions:
# (a) that all functions in the with_columns subdag take in some part of the dataframe
# (b) that all intermediate functions in the with_columns subdag need to become columns in the dataframe
@h_spark.with_columns(
*doc_pipeline.spark_safe,
select=["article_text", "chunked_text"],
columns_to_pass=["url"],
)
def chunked_url_text(urls_from_sitemap: ps.DataFrame) -> ps.DataFrame:
"""Creates dataframe with chunked text from URLs appended as columns.
I.e. `urls_from_sitemap` declares the dependency, and then
`with_columns` runs and appends columns to it, and then the
internal part of this function is called.
:param urls_from_sitemap:
:return:
"""
# this has the new columns article_text and chunked_text
return urls_from_sitemap
The “map” operations are run by the with_columns
decorator as UDFs, and are appended onto the DataFrame - specifically only article_text
and chunked_text
are added.
The changes we need to make to the “map” code are:
Change any return types into something that can become a spark column.
For those functions we can’t change return types, we need to know that they shouldn’t be used in the `with_columns` decorator.
Concretely in our example, we can’t create a column that is of type list of langchain documents, so we need to convert them to a string (or a struct object). We chose a JSON string here. We then explicitly curate which functions can then be turned into columns, and that is what is referenced in the with_columns
decorator above.
def chunked_text(
article_text: str,
html_chunker: text_splitter.HTMLHeaderTextSplitter,
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
) -> list[str]:
"""This function takes in HTML, chunks it, and then chunks it again.
It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.
:param article_text:
:param html_chunker:
:param text_chunker:
:return: need to return something we can make a PySpark column with
"""
header_splits = html_chunker.split_text(article_text)
splits = text_chunker.split_documents(header_splits)
# TODO: make this a struct field compatible structure
return [json.dumps(s.to_json()) for s in splits]
# this is a helper variable that we use to tell `@h_spark.with_columns` decorator which functions we want
# it to create the subdag with that will take in and operate over the dataframe depended on.
spark_safe = [
article_regex,
article_text,
chunked_text,
]
Note: to debug output, one nice thing with PySpark is that you can request intermediate nodes via the select=
parameter and have them appended to the dataframe. This means it’s easy to check the output of intermediate steps in the pipeline easily if you wanted to.
Caveats
There are a few caveats:
Serialization. Objects need to be serializable, much like in the above Ray/Dask case, if they’re being sent to and from executors.
Each python function in the pipeline that can be turned into a PySpark column is run as a UDF. Hamilton will make sure the order of operations is correct.
Failures: you need to ensure the code being run can gracefully handle failures yourself.
Concurrency/Parallelism: you need to (a) partition the input dataframe appropriately for the parallelism you want, and then (b) also have that many executors available for computation.
Extensions:
So this pipeline, while realistic, is still a simple case. You might want to think about the following concerns if you’re building and maintaining a production pipeline.
Adding embedding computation
Embeddings for a text chunk are a map operation as well. So extending the pipeline with more functions is a straightforward task. You can either add them to extend the parallelizable & collect block. Or, add a new parallelizable & collect block extending from the end of the current one.
Pushing to a datastore
Once you have embeddings you need to store them. You could do this inside the parallelizable & collect block as well, or do it in the function that collects the results to be able to store them in batches.
Recomputing only what has changed
In our case above, if the documentation changes, we should only want to recompute only those documents that have changed, and not the entire corpus. To do that, one would need to either include a filter step, or check before processing a URL whether the contents of it has changed. This can typically be achieved by taking a hash and having it stored somewhere, that you can then retrieve and compare.
Observability & Debugging
Hamilton comes with lifecycle APIs that one can provide implementations for. This enables you to come up with your own implementations for monitoring and debugging. If you want something off the shelf there is a datadog integration, as well as DAGWorks (there’s a free tier!), that will provide you with a lot more.
What about that document chatbot?
If you join Hamilton’s slack, you’ll be able to chat with it. It’s called @ThreadScribe
, powered by Threadscribe.ai. If you’re after a Slack Bot that can help answer questions from your documentation, or summarize threads for creating issues, you should check out Threadscribe.ai! Fun fact: their document ingestion is powered by Hamilton and monitored with DAGWorks.
TL;DR: what we covered in this post
We covered:
How to write a document processing pipeline in Hamilton and run it locally.
Showed a few choices (Ray, Dask, PySpark) as options to take the code you wrote and scale it for production. Because this is so easy to do, this will shrink the time it takes to go from development to production! See the example code here or use it in three lines of code from the hub to pull the code and get started.
Gave pointers for likely next steps to complete the RAG picture. E.g. adding embeddings, pushing to a vector store, links for observability where you can build your own tools, or use DAGWorks for off-the-shelf insights - sign up for free to check it out here.
For me, the bigger picture is, that we showed with Hamilton that you can write simple python functions and then easily scale the code for production purposes. If you have an existing RAG pipeline, we’d love your thoughts and comparisons; with Hamilton your code is always unit testable and documentation friendly, and with the visualization can easily provide a clear sense of provenance for your pipeline. How does yours compare? 😎
FAQ
Q: How does Hamilton compare to other workflow/pipeline tools like Prefect, Airflow, etc?
A: Hamilton is just a library that is designed specifically for building modular, testable data transformation pipelines using plain Python functions. It standardizes how you write data processing code by having you define transformations as functions. This makes the code easier to develop, test, and reason about compared to tools that use configuration files or higher-level abstractions. Tools like Prefect and Airflow are heavy weight systems that handle scheduling and provisioning of resources. You wouldn’t replace them with Hamilton, but Hamilton will happily run inside of them!
Q: Can I use Hamilton for just part of my pipeline rather than the full ingestion process?
A: Yes, absolutely. Hamilton allows you to break down your pipeline into reusable, modular pieces of code. You can use Hamilton for just the document chunking part, just the embedding generation, or any other subset of functionality within the overall ingestion pipeline.
Q: How does Hamilton handle failures and retries?
A: Hamilton itself does not have built-in retry or failure handling mechanisms. However, since the code is just Python functions, you can implement your own robust error handling and retries based on your specific requirements within each function.
Q: Does Hamilton support incremental processing of new/updated documents?
A: Incremental processing is not a core Hamilton feature, but it can be implemented within your pipeline code. For example, you could check metadata like file hashes to determine if a document needs to be re-processed before ingesting it.
Q: How does Hamilton's performance compare to other solutions?
A: Hamilton is designed to make it easy to scale your pipelines across multithreading, Ray, Dask, Spark and other parallel execution engines. Compared to monolithic pipelines, Hamilton's modular approach can make it easier to identify and parallelize the most expensive steps of your workload.
Q: What are the main benefits of using Hamilton over rolling my own custom framework code?
A: The primary benefits are that you don’t have to maintain a custom bespoke framework. Hamilton has a lot of people using it and is battle tested. Hamilton has specific design goals that result in increased modularity, testability, and the ability to easily scale across different parallel execution engines while writing lightweight, standardized Python functions. Hamilton also provides utilities for visualizing and reporting on pipelines that you don’t need to build yourself.