Investigate alternatives for task queue #127

Open
opened 2026-02-05 08:28:07 +00:00 by hurui200320 · 2 comments
Member

Currently the python adapter provides a way to send messages over RabbitMQ, however, this is not enough for a task queue, which has some extra insurances and requirements for distributing tasks (something that needs processing), instead of a general message.

The current options are:

  • Celery
  • Hatchet
  • Improving existing amq python library
Currently the python adapter provides a way to send messages over RabbitMQ, however, this is not enough for a task queue, which has some extra insurances and requirements for distributing tasks (something that needs processing), instead of a general message. The current options are: + Celery + Hatchet + Improving existing amq python library
Author
Member

Celery

This comment contains all the findings related to celery, including a list of TODOs to make current cleverswarm using it.

Architecture overview

Celery requires the following components:

  • Producer (cleverswarm frontend)
  • Worker (cleverswarm backend)
  • Broker (RabbitMQ)
  • Result backend (if you want to get a result via celery)
    • This one is not required since cleverswarm backend does not return any results

Frontend

For cleverswarm, the frontend is relatively simple.

Assuming the backend module has a method being annotated by @app.task, for example:

# tasks.py
from celery import Celery

app = Celery('myapp', broker='...')

@app.task
def add(x, y):
    return x + y

then you could import this method and call the delay method on it:

from my_project.tasks import add 

def view_function(request):
    add.delay(4, 4) # return async result

Or, if this is not ideal, you can also use something like this in the frontend:

from celery import Celery

app = Celery('producer_app', broker='redis://...')

def trigger_task():
    app.send_task('my_project.tasks.add', args=[4, 4])

Backend

Celery is pretty invasive for the backend side.

First, one must use celery -A module_name worker -l info to bring up the workers. This will bring up celery worker, a set of processes to connect to broker, listen to incoming messages, and call the registered method (annotated by @app.task).

There are several problems:

Component initialization

To initialize the worker process, for example, initialize the python library for the backpressure report, we need to define a method like this:


adapter = None

@worker_process_init.connect
def init_worker(**kwargs):
    global adapter
    adapter = ...

Backpressure syncing

This is the biggest problem with celery. Celery worker relies on the process, aka fork, not thread. This means the backpressure service inside each task cannot be shared across processes. For cleverswarm backend, it means we will have multiple processes reporting backpressure for the same pod, confusing the management service.

There is a way to create a shared counter. By using Bootsteps, we can register a Value (from multiprocessing module) to the worker. When it is forked into multiple processes, they still share this value and update it via lock to ensure thread-safe.

For reporting, we can have multiple process reporting for the same pod as long as they agree on their value.

However, even with this multiprocessing.Value solution, it still doesn't fit into the existing backpressure management code in python library. So we will likely need to write a dedicated library to work with celery specifically.

# Celery This comment contains all the findings related to celery, including a list of TODOs to make current cleverswarm using it. ## Architecture overview Celery requires the following components: + Producer (cleverswarm frontend) + Worker (cleverswarm backend) + Broker (RabbitMQ) + Result backend (if you want to get a result via celery) + This one is not required since cleverswarm backend does not return any results ### Frontend For cleverswarm, the frontend is relatively simple. Assuming the backend module has a method being annotated by `@app.task`, for example: ```python # tasks.py from celery import Celery app = Celery('myapp', broker='...') @app.task def add(x, y): return x + y ``` then you could import this method and call the `delay` method on it: ```python from my_project.tasks import add def view_function(request): add.delay(4, 4) # return async result ``` Or, if this is not ideal, you can also use something like this in the frontend: ```python from celery import Celery app = Celery('producer_app', broker='redis://...') def trigger_task(): app.send_task('my_project.tasks.add', args=[4, 4]) ``` ### Backend Celery is pretty invasive for the backend side. First, one must use `celery -A module_name worker -l info` to bring up the workers. This will bring up celery worker, a set of **processes** to connect to broker, listen to incoming messages, and call the registered method (annotated by `@app.task`). There are several problems: #### Component initialization To initialize the worker process, for example, initialize the python library for the backpressure report, we need to define a method like this: ```python adapter = None @worker_process_init.connect def init_worker(**kwargs): global adapter adapter = ... ``` #### Backpressure syncing This is the biggest problem with celery. Celery worker relies on the process, aka fork, not thread. This means the backpressure service inside each task cannot be shared across processes. For cleverswarm backend, it means we will have multiple processes reporting backpressure for the same pod, confusing the management service. There is a way to create a shared counter. By using [Bootsteps](https://docs.celeryq.dev/en/stable/userguide/extending.html), we can register a `Value` (from `multiprocessing` module) to the worker. When it is forked into multiple processes, they still share this value and update it via lock to ensure thread-safe. For reporting, we can have multiple process reporting for the same pod as long as they agree on their value. However, even with this `multiprocessing.Value` solution, it still doesn't fit into the existing backpressure management code in python library. So we will likely need to write a dedicated library to work with celery specifically.
Author
Member

Hatchet

Hatchet is an execution engine. Compared to celery, hatchet is more like a client-side library (also you need a hatchet server). However, hatchet allows you to bring it up with your own pace and control:

import asyncio
from hatchet_sdk import Hatchet
from my_workflows.welcome_flow import UserWelcomeWorkflow 

def run():
    hatchet = Hatchet()
    
    worker = hatchet.worker("welcome-worker-group")
    
    worker.register_workflow(UserWelcomeWorkflow())
    
    worker.start() # blocking

if __name__ == "__main__":
    run()

It looks promising and provides stages, so the progress can be tracked. But still requires a lot of changes in cleverswarm.

Hatchet uses asyncio, which is much much much better than the fork processes used in celery.

Hatchet also supports concurrent limit, which limited how many tasks a worker can pull:

Slots are the number of concurrent task runs that a worker can execute, are are configured using the slots option on the worker. For instance, if you set slots=5 on your worker, then your worker will be able to run five tasks concurrently before new tasks start needing to wait in the queue before being picked up. Increasing the number of slots on your worker, or the number of workers you run, will allow you to handle more concurrent work (and thus more throughput, in many cases).

https://docs.hatchet.run/home/workers#understanding-slots

# Hatchet Hatchet is an execution engine. Compared to celery, hatchet is more like a client-side library (also you need a hatchet server). However, hatchet allows you to bring it up with your own pace and control: ```python import asyncio from hatchet_sdk import Hatchet from my_workflows.welcome_flow import UserWelcomeWorkflow def run(): hatchet = Hatchet() worker = hatchet.worker("welcome-worker-group") worker.register_workflow(UserWelcomeWorkflow()) worker.start() # blocking if __name__ == "__main__": run() ``` It looks promising and provides stages, so the progress can be tracked. But still requires a lot of changes in cleverswarm. Hatchet uses asyncio, which is much much much better than the fork processes used in celery. Hatchet also supports concurrent limit, which limited how many tasks a worker can pull: > Slots are the number of concurrent task runs that a worker can execute, are are configured using the slots option on the worker. For instance, if you set slots=5 on your worker, then your worker will be able to run five tasks concurrently before new tasks start needing to wait in the queue before being picked up. Increasing the number of slots on your worker, or the number of workers you run, will allow you to handle more concurrent work (and thus more throughput, in many cases). > > https://docs.hatchet.run/home/workers#understanding-slots
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Blocks
You do not have permission to read 1 dependency
Reference
clevermicro/amq-adapter-python#127
No description provided.