Scaling Hamilton with Ray in 5 minutes
Scale the human side of your data transforms as well as the compute side
Scale the human side of your data transforms as well as the compute side
Hamilton is an open source, declarative dataflow micro-framework for Python. In this post I will explain what’s required to scale the data and CPU you use for your workflow with Hamilton by utilizing Hamilton’s Ray integration. This post assumes prior familiarity with what Hamilton is. For the backstory and longer introduction, we invite you to read this TDS post, or for a shorter five minute introduction see this TDS post, or for how it helps scale teams & keep their code bases organized see this TDS post.
For those unfamiliar with Ray, it is an open source framework that can scale python applications that came out of UC Berkeley. It has a growing ecosystem of tooling that helps with lots of machine learning related workflows. For example, it sells itself as enabling you to scale from your laptop to a cluster very easily, without having to change much code. In terms of real world use, I like to use Ray as a very quick way to implement multiprocessing in python without worry about the details!
Ray Primer
Here is a Ray primer. The good news is, is that you don’t have to know much about Ray to use it with Hamilton. You just need to know that it will parallelize your workflow over multiple CPU cores easily, and allow you to scale beyond your laptop if you have a Ray Cluster set up. But just so you understand how Hamilton connects with it, let’s quickly go over how one would use Ray.
Ray Usage Premise
The basic premise to use Ray, is that you have to annotate your functions that you want to be scheduled for execution via Ray. For example (from their documentation):
# This is a regular Python function.
def normal_function():
return 1
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_ray_function():
return 1
Then to execute the my_ray_function
function, you would do:
my_ray_function.remote()
Which would then tell Ray to schedule the function for execution. To run locally versus on a cluster, all you have to do is instantiate “Ray” differently before calling the above code.
import ray
ray.init() # local execution over multiple cores.
ray.init({... cluster details ...}) # connect to cluster.
Now, 🤔, you might be wondering this seems like a lot of work to get my existing code to run, e.g. how do I pass in parameters to my functions? How should I change my application to make better use of Ray? etc. Good news! You don’t have to think about that with Hamilton at all! You just write your standard Hamilton functions, and only change some “driver” code to make it run on Ray. More on that in the next section.
Hamilton + Ray
To use Ray with Hamilton, you first need to install it.
pip install "sf-hamilton[ray]"
Next, with Hamilton by default, all your logic is written as python functions. So you write your Hamilton functions as you normally would. No change here.
Come execution time, at the Hamilton framework level, Hamilton can easily inject @ray.remote
for every single function in the directed acyclic graph (DAG) your functions define. To reiterate, you don’t have to change any of your Hamilton code to make use of Ray! All you need to do, to make Hamilton run on Ray, is provide a “GraphAdapter
” object to the Hamilton “Driver
” class you instantiate.
A GraphAdapter
, is just a simple class that has a few functions defined that enable you to augment how your DAG is walked and executed. See the docs for more information on them.
In terms of code to add/change, here’s what’s required to augment standard Hamilton driver code — see bold font:
import ray
from hamilton import base, driver
from hamilton.experimental import h_ray
...
ray.init() # instantiate Ray
config = {...} # instantiate your config
modules = [...] # provide modules where your Hamilton functions live
rga = h_ray.RayGraphAdapter( # object to tell Hamilton to run on Ray
result_builder=base.PandasDataFrameResult())
dr = driver.Driver(config, *modules, adapter=rga) # tell Hamilton
df = dr.execute([...]) # execute Hamilton as you normally would
ray.shutdown()
For those unfamiliar with Hamilton, if you were to remove the bolded code additions, that’s how you would run plain Hamilton that comes out of the box.
Specifically we need to:
Instantiate Ray:
ray.init()
.Instantiate a
RayGraphAdapter
, passing in aResultBuilder
object which sets what object type to return when.execute()
is called on thedriver.Driver
object. In this example we’re specifying that it should return a Pandas data frame.The
RayGraphAdapter
is then passed as a keyword argumentadapter=rga
when creating thedriver.Driver
object, which will tell Hamilton to augment walking the DAG and to use Ray.No other changes are required, and so after you’re done, you just need to shutdown your connection to Ray
ray.shutdown()
.
Scaling is that simple!
By adding just a few lines of code, you can now:
Parallelize computation of your Hamilton functions.
Scale to running on cluster size data.
To recap, the recipe for using Ray with Hamilton doesn’t change much from using plain vanilla Hamilton:
Install Hamilton + Ray.
pip install "sf-hamilton[ray]"
.Write Hamilton functions.
Write your driver code — adjust this part if you want it to run on Ray.
Execute your driver script.
Since it’s so easy to switch to using Ray or not, we’d love some benchmarks/anecdotes to see how much switching to Ray improves the speed or scale at which you can operate your dataflow!
For a full “Ray Hello World” code sample, we direct you to the examples directory here.
To conclude
By using Hamilton, you can organize and scale the human side of writing data transforms (no, I didn’t talk about this in this post, see links in the introduction or below to convince yourself there 😉). With Ray, you can scale your data workflows to work beyond the limits of your laptop. Together, the skies the limit!
If you like Hamilton and want to follow more on the project:
we’d love a ⭐️ on github!
📣 join our fledging community on slack!
check out our documentation 📚.
Caveats
A brief note on caveats with using Hamilton + Ray.
We are looking to graduate Ray support from “experimental”, but to do that we need your feedback! The API has been very stable (hasn’t changed since launch), but to feel good about making it permanent, we’d love to know what you think.
We don’t expose all the functionality of Ray, but we could. E.g. memory aware scheduling, or specifying resources for specific functions. Let us know if you want something exposed — create an issue on github please — https://github.com/dagworks-inc/hamilton 🙏.
Related Posts
For more on Hamilton we invite you to check out:
[Future post] Hamilton + Dask in 5 minutes