Addendum: Improving our Parallel Task API with Celery
A followup to our previous discussion of running parallel tasks in Celery, with details on how we solved the largest problem with our initial implementation
Our Computational Biology software team develops several workflows for designing and executing large sets of genetic edits of microbes. These workflows are written in Python and executed asynchronously via an internal task-running service, ZWork, built using Celery.
A few months ago, we wrote about how to run parallel tasks with Celery, plus how and why we built an API on top of Celery’s task primitives. But one major problem remained: the manner in which Celery sends data to a group of workers can use a large amount of memory. This post describes how we addressed that problem.
Parallelization Problems with Celery
As described in our previous post, we successfully refactored our existing task code; for example, for one of our motivating workflows, post-parallelization we now see wall-clock run times of five to fifteen seconds, rather than one to four minutes.
However, one thing we learned was that the manner in which Celery sends data to a group of workers can use a large amount of memory. Specifically, our implementation includes:
- a setup step to split the work into tasks;
- several instances of a process step, one to execute each task; and
- a join step to collect and combine results from all the process steps.
Across all of the workers executing tasks in the process step, each task being executed receives the same input data, and then works on its particular slice. In other words, the entirety of the data needing to be processed by the individual workers is duplicated in direct proportion to the chunking of the data.
The following diagram, using an example of summing a list of numbers, both shows the pattern we’re using and illustrates how this data replication occurs:
The manner in which Celery sends data to a group can have potentially large memory usage. This example of summing a list of numbers illustrates how the entirety of the input data is duplicated for each worker used.
This, of course, can be massively inefficient. But for many of our tasks, the inputs are object ids. Our processing step passed all the ids to each individual parallel worker, and each worker then fetched just the actual objects it needed from the database. Because our data was reasonably-sized, we decided to live with this initially. However, some tasks did use large, nested data structures instead of object ids, so we had to figure out a solution in short order.
Avoiding the Data Duplication
To avoid this duplication of data in proportion to the number of workers, we changed the way we assign work in the fork-join pattern.
Now, we no longer pass the full results from the setup step to each worker in the process step — which is what was dramatically increasing our memory consumption. The main trick was to define a new Celery task which dynamically constructs and executes a group of process steps based on the size of the output from the setup step.
This approach is an unusual use of Celery. Normally, a step is only responsible for operating on some incoming data and returning its results. The coder is responsible for defining how all the steps are wired together and the Celery framework takes care of all the nitty-gritty of asynchronously initiating each step and making sure the data is routed correctly. In our case, we’ve introduced an intermediate step whose job is to play the role of the coder. It is responsible for defining the size and shape of the steps that come after it and delegating work to those steps. It does this by defining a Celery Chord where the header group is the number of process steps we want to run in parallel and the callback is the join step.
The example of summing a list of numbers using our new, modified approach to avoid unnecessary duplication of input data.
See the gist of the code we used to do this here.
Since this is such an unusual use, we can’t rely on Celery to automatically invoke the downstream steps with the correct data. We have to do it ourselves. That is why the intermediate step is both defining and executing the steps that come after it.
A potential point of confusion is that it almost looks like the intermediate step is blocked waiting for the chord step to finish, which Celery strongly advises against. Making synchronous calls to subtasks is both inefficient and can lead to deadlock if you are running low on workers.
However, in this particular scenario, the chord is being invoked asynchronously and thus the intermediate step does not have to wait for the chord to complete before completing itself. In fact, this pattern is similar to how Celery itself handles initiating downstream steps and is officially blessed by the creator of Celery, at least according to Stack Overflow.
Results So Far
The main win from our change is that each worker only gets the input it needs to operate, not a full copy of the setup step’s results. We’ve been able to run workflows an order of magnitude larger without any memory issues, since our memory footprint no longer grows proportional to the number of workers.
We’ve now had this updated version of our Celery parallel task infrastructure running in our production environments for over nine months. Not only are we seeing a 10x speedup from parallelizing our workflows, we can now accommodate a wider variety of workflows while using a static memory footprint — win-win!
For additional background on the original problem, or for details on the API we built on top of our Celery infrastructure to make writing parallel tasks even easier, see our previous post.
Matthew Batterton is a Software Engineering Manager and Zach Palchick is a Staff Bioinformatics Software Engineer. They both work on Computational Biology teams at Zymergen. Molly Jones, a former Technical Writer at Zymergen, also contributed to this post.