Understand how celery works by building a clone.(15 July 2019)
Intro A delayed job processor(also called a background processor, asynchronous task queue etc) is a software system that can run code at a later time. Examples of such software includes; Celery, Resque, Sidekiq, among others. In this blogpost we will try and understand how these things work by building a clone/replica of such software. When do we need to use a background task processor? Take the example of an e-commerce website, when someone makes an order(by clicking on the order submit button); the e-commerce website needs to;- Show a confirmation message of the order
- Persist that order to a database
- Emit an order metric event
- Send out a confirmation email to the customer
- etc
- You have a function that contains some code that can take a long time to run, maybe because it will get blocked on IO
- You annotate that function with some functionality provided by the 'background job' library
- when you execute your function, instead of it running synchronously; the background job library will 'take' your function and serialize it to an object(eg string)
- the background job library then takes that serialized object and stores it someplace(eg in a database)
- then at a later time, workers(provided by the background job library) will take the serialized object from the store, convert/deserialize it to its original form
- then use that to execute your original function
(a) Base-Task We will create a base task/job class that all users of backie will have to subclass in order to create tasks that can be ran in the background.
# backie/task.py
import abc
import json
import uuid
from .broker import Broker
class BaseTask(abc.ABC):
"""
Example Usage:
class AdderTask(BaseTask):
task_name = "AdderTask"
def run(self, a, b):
result = a + b
return result
adder = AdderTask()
adder.delay(9, 34)
"""
task_name = None
def __init__(self):
if not self.task_name:
raise ValueError("task_name should be set")
self.broker = Broker()
@abc.abstractmethod
def run(self, *args, **kwargs):
# put your business logic here
raise NotImplementedError("Task `run` method must be implemented.")
def delay(self, *args, **kwargs):
try:
task_id = str(uuid.uuid4())
_task = {"task_id": task_id, "args": args, "kwargs": kwargs}
serialized_task = json.dumps(_task)
self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
print("task: {0} succesfully queued".format(task_id))
except Exception:
raise Exception("Unable to publish task to the broker.")
So, if you want to create a task that will be processed in the background; you import the
BaseTask from
backie, create a subclass of it and also provide an implementation of the run
method.
To call your task, you use the delay method with appropriate arguments. And when you do that, the
BaseTask
will serialize those arguments to json and store them in the provided
broker/store(BaseTask.broker).
(b) Broker
Our BaseTask refers to a broker that is been used to store the tasks. Lets implement that broker.
# backie/broker.py
import redis # pip install redis
class Broker:
"""
use redis as our broker.
This implements a basic FIFO queue using redis.
"""
def __init__(self):
host = "localhost"
port = 6379
password = None
self.redis_instance = redis.StrictRedis(
host=host, port=port, password=password, db=0, socket_timeout=8.0
)
def enqueue(self, item, queue_name):
self.redis_instance.lpush(queue_name, item)
def dequeue(self, queue_name):
dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
if not dequed_item:
return None
dequed_item = dequed_item[1]
return dequed_item
In this case we are using redis as our backing broker/store. But you can use any other store to do that.
You could
store the tasks in memory, in a database, in the filesystem, you could even store them in a blockchain
if you want to stay current with the hype.
What you use as the backing store does not matter; as you can see, the BaseTask is serializing
the task arguments to a json string and that is what is stored. Find yourself a place that can store
json strings and you are good to go.
This is how celery is able to support different brokers.
When you do task.delay(3,5) Celery serializes
all the task arguments to json and sends
them
to a broker for storage.
(c) Worker
Now that the task arguments(and any other metadata like task_id) have been stored in the broker, we now
need to actually run those tasks.
We do that via a worker, lets implement one;
# backie/worker.py
import json
class Worker:
"""
Example Usage:
task = AdderTask()
worker = Worker(task=task)
worker.start()
"""
def __init__(self, task) -> None:
self.task = task
def start(self,):
while True:
try:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item["task_id"]
task_args = dequeued_item["args"]
task_kwargs = dequeued_item["kwargs"]
print("running task: {0}".format(task_id))
self.task.run(*task_args, **task_kwargs)
print("succesful run of task: {0}".format(task_id))
except Exception:
print("Unable to execute task.")
continue
The worker takes a task instance as an argument. Then in a loop(in the start method), it dequeues
a task from the broker, json deserializes that task to get the task arguments, then uses those
argumensts to call the task.
It is the work of celery workers to dequeue a task from the broker, deserializes
its arguments/metadata and executes the original
function with those arguments.
Usage
We have implemented the background task processor(backie) that is similar to celery, so how do we
use it?
Going to our previous example of an e-commerce site, lets use backie to implement the
various tasks/jobs that need to be ran.
We will need a task to; Persist that order to it's database, another to emit an order metric event and
lastly one that sends out a confirmation email to the customer.
For the sake of brevity, we will only look at the task that sends out emails after ordering.
# ecommerce_tasks.py
from backie.task import BaseTask
# pip install requests
import requests
class EmailTask(BaseTask):
"""
task to send email to customer after they have ordered.
"""
task_name = "EmailTask"
def run(self, order_id, email_address):
# lets pretend httpbin.org is an email service provider
url = "https://httpbin.org/{0}/{1}".format(order_id, email_address)
print(url)
response = requests.get(url, timeout=5.0)
print("response:: ", response)
if __name__ == "__main__":
order_id = "24dkq40"
email_address = "example@example.org"
email_task = EmailTask()
email_task.delay(order_id, email_address)
And then we also need to implement the workers;
# ecommerce_worker.py
from ecommerce_tasks import EmailTask
from backie.worker import Worker
if __name__ == "__main__":
email_task = EmailTask()
# run workers
worker = Worker(task=email_task)
worker.start()
Putting it all together, lets run a redis server(using docker) in one terminal;
docker run -p 6379:6379 redis:5.0-alpine
And in another terminal we run the tasks;
python ecommerce_tasks.py
task: 4ed63e42-f614-4093-a654-46211d5de8cf succesfully queued
In a third terminal run the workers;
python ecommerce_worker.py
running task: 4ed63e42-f614-4093-a654-46211d5de8cf
https://httpbin.org/24dkq40/example@example.org
succesful run of task: 4ed63e42-f614-4093-a654-46211d5de8cf
Conclusion
That was a brief overview of how some(most?) background task processors work.
The principle behind how they work is very simple; turn a tasks arguments to a string, store the string
in a database of sorts, retrieve that string from the database at a later datetime and feed those
arguments
to the actual task to run.
Of course on top of that simple idea, these task processors go on and layer other useful features like;
retries, periodic tasks, chaining of tasks etc.
All the code in this blogpost can be found at:
https://github.com/komuw/komu.engineer/tree/master/blogs/07
PS: Check out wiji which is
an experimental python3 asyncio distributed task processor.
You can comment on this article by clicking here.