Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Python Celery & RabbitMQ Tutorial

9 Jan 2018 1  
In this tutorial, we are going to have an introduction to basic concepts of Celery with RabbitMQ and then set up Celery for a small demo project.

The post appeared first on Tests4Geeks.

Celery is an asynchronous task queue. It can be used for anything that needs to be run asynchronously. For example, background computation of expensive queries. RabbitMQ is a message broker widely used with Celery. In this tutorial, we are going to have an introduction to basic concepts of Celery with RabbitMQ and then set up Celery for a small demo project. At the end of this tutorial, you will be able to setup a Celery web console monitoring your tasks.

celery-flower

Basic Concepts

Let’s use the below graphic to explain the foundations:

celery_architecture_final

Broker

The Broker (RabbitMQ) is responsible for the creation of task queues, dispatching tasks to task queues according to some routing rules, and then delivering tasks from task queues to workers.

Consumer (Celery Workers)

The Consumer is the one or multiple Celery workers executing the tasks. You could start many workers depending on your use case.

Result Backend

The Result Backend is used for storing the results of your tasks. However, it is not a required element, so if you do not include it in your settings, you cannot access the results of your tasks.

Install Celery

First, we need to install Celery, which is pretty easy using PyPI:

$ pip install celery

Note that it would be better to do this using a virtual environment.

Choose a Broker: RabbitMQ

This is where the confusion begins. Why do we need another thing called broker? It’s because Celery does not actually construct a message queue itself, so it needs an extra message transport (a broker) to do that work. You can think of Celery as a wrapper around a message broker.

In fact, you can choose from a few different brokers, like RabbitMQ, Redis, or a database (e.g., a Django database).

In this tutorial, we are using RabbitMQ as our broker because it is feature-complete, stable and recommended by Celery. In Mac OS, it is easy to install RabbitMQ using Homebrew:

$ brew install rabbitmq
# if you are using Ubuntu or Debian, try:
# sudo apt-get install rabbitmq-server

Start RabbitMQ

Homebrew will install RabbitMQ in /usr/local/sbin although some systems may vary. You can add this path to the environment variable PATH for convenient usage later. For example, open your shell startup file (e.g.~/.bash_profile) and add the following line:

PATH=$PATH:/usr/local/sbin

Now we can start the RabbitMQ server using the command rabbitmq-server. You will see similar output if the RabbitMQ server starts successfully.

$ rabbitmq-server

              RabbitMQ 3.5.1. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

Configure RabbitMQ for Celery

Before we can use RabbitMQ for Celery, we need to do some configurations for RabbitMQ. Briefly speaking, we need to create a virtual host and user, then set user permissions so it can access the virtual host.

# add user 'jimmy' with password 'jimmy123'
$ rabbitmqctl add_user jimmy jimmy123
# add virtual host 'jimmy_vhost'
$ rabbitmqctl add_vhost jimmy_vhost
# add user tag 'jimmy_tag' for user 'jimmy'
$ rabbitmqctl set_user_tags jimmy jimmy_tag
# set permission for user 'jimmy' on virtual host 'jimmy_vhost'
$ rabbitmqctl set_permissions -p jimmy_vhost jimmy ".*" ".*" ".*"

Note that there are three kinds of operations in RabbitMQ: configure, write and read.

The ".*" ".*" ".*" string at the end of the above command means that the user “jimmy” will have all configure, write and read permissions. To find more information about permission control in RabbitMQ, you can refer to http://www.rabbitmq.com/access-control.html.

A Simple Demo Project

Now let’s create a simple project to demonstrate the use of Celery. You can find the demo on Github here.

Project Structure

Below is the structure of our demo project:

test_celery
    __init__.py
    celery.py
    tasks.py
    run_tasks.py

celery.py

Add the following code in celery.py:

from __future__ import absolute_import
from celery import Celery

app = Celery('test_celery',
             broker='amqp://jimmy:jimmy123@localhost/jimmy_vhost',
             backend='rpc://',
             include=['test_celery.tasks'])

Here, we initialize an instance of Celery called app, which is used later for creating a task.

The first argument of Celery is just the name of the project package, which is “test_celery”.

The broker argument specifies the broker URL, which should be the RabbitMQ we started earlier. Note that the format of broker URL should be:
transport://userid:password@hostname:port/virtual_host

For RabbitMQ, the transport is amqp.

The backend argument specifies a backend URL. A backend in Celery is used for storing the task results. So if you need to access the results of your task when it is finished, you should set a backend for Celery.

rpc means sending the results back as AMQP messages, which is an acceptable format for our demo. More choices for message formats can be found here.

The include argument specifies a list of modules that you want to import when Celery worker starts. We add the tasks module here so that the worker can find our task.

tasks.py

In this file, we define our task longtime_add:

from __future__ import absolute_import
from test_celery.celery import app
import time

@app.task
def longtime_add(x, y):
    print 'long time task begins'
    # sleep 5 seconds
    time.sleep(5)
    print 'long time task finished'
    return x + y

You can see that we import the app defined in the previous celery module and use it as a decorator for our task method. Note that app.task is just a decorator. In addition, we sleep 5 seconds in our longtime_add task to simulate a time-expensive task:)

run_tasks.py

After setting up Celery, we need to run our task, which is included in the runs_tasks.py:

from .tasks import longtime_add
import time

if __name__ == '__main__':
    result = longtime_add.delay(1,2)
    # at this time, our task is not finished, so it will return False
    print 'Task finished? ', result.ready()
    print 'Task result: ', result.result
    # sleep 10 seconds to ensure the task has been finished
    time.sleep(10)
    # now the task should be finished and ready method will return True
    print 'Task finished? ', result.ready()
    print 'Task result: ', result.result

Here, we call the task longtime_add using the delay method, which is needed if we want to process the task asynchronously. In addition, we keep the results of the task and print some information. The ready method will return True if the task has been finished, otherwise False. The result attribute is the result of the task (“3” in our case). If the task has not been finished, it returns None.

Start Celery Worker

Now, we can start Celery worker using the command below (run in the parent folder of our project folder test_celery):

$ celery -A test_celery worker --loglevel=info

You will see something like this if Celery successfully connects to RabbitMQ:

$ celery -A test_celery worker --loglevel=info
 -------------- celery@zhangmatoMacBook-Pro.local v3.1.23 (Cipater)
---- **** ----- 
--- * ***  * -- Darwin-15.4.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         test_celery:0x10d9b0d10
- ** ---------- .> transport:   amqp://jimmy:**@localhost:5672/jimmy_vhost
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . test_celery.tasks.longtime_add

[2016-04-24 22:41:20,297: INFO/MainProcess] Connected to amqp://jimmy:**@127.0.0.1:5672/jimmy_vhost
[2016-04-24 22:41:20,310: INFO/MainProcess] mingle: searching for neighbors
[2016-04-24 22:41:21,321: INFO/MainProcess] mingle: all alone
[2016-04-24 22:41:21,374: WARNING/MainProcess] celery@zhangmatoMacBook-Pro.local ready.

Run Tasks

In another console, input the following (run in the parent folder of our project folder test_celery):

$ python -m test_celery.run_tasks

Now if you look at the Celery console, you will see that our worker received the task:

[2016-04-24 22:59:16,508: INFO/MainProcess] 
Received task: test_celery.tasks.longtime_add[25ba9c87-69a7-4383-b983-1cefdb32f8b3]
[2016-04-24 22:59:16,508: WARNING/Worker-3] long time task begins
[2016-04-24 22:59:31,510: WARNING/Worker-3] long time task finished
[2016-04-24 22:59:31,512: INFO/MainProcess] 
Task test_celery.tasks.longtime_add[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3

As you can see, when our Celery worker received a task, it printed out the task name with a task id (in the bracket):

Received task: test_celery.tasks.longtime_add[7d942984-8ea6-4e4d-8097-225616f797d5]

Below this line are two lines that were printed by our task longtime_add, with a time delay of 5 seconds:

long time task begins
long time task finished

The last line shows that our task was finished in about 5 seconds and the task result is 3:

Task test_celery.tasks.longtime_add[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3

In the current console, you will see the following output:

$ python -m test_celery.run_tasks
Task finished? False
Task result: None
Task finished? False
Task result: None

This is the expected behavior. At first, our task was not ready, and the result was None. After 10 seconds, our task has been finished and the result is 3.

Monitor Celery in Real Time

Flower is a real-time web-based monitor for Celery. Using Flower, you could easily monitor your task progress and history.

We can use pip to install Flower:

$ pip install flower

To start the Flower web console, we need to run the following command (run in the parent folder of our project folder test_celery):

$ celery -A test_celery flower

Flower will run a server with default port 5555, and you can access the web console at http://localhost:5555.

screen_shot_9

Conclusion

Celery is easy to set up when used with the RabbitMQ broker, and it hides the complex details of RabbitMQ. You can find the full set code of demo project above on Github.

Jimmy Zhang is a software developer experienced in backend development with Python and Django. You could find more about him on his website http://www.catharinegeek.com/

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here