SoFunction
Updated on 2024-11-15

Python Celery Multi-Queue Configuration Code Example

This article introduces the Python Celery multi-queue configuration code examples, the text of the sample code through the introduction of the very detailed, for everyone's learning or work has a certain reference learning value, the need for friends can refer to the following

Celery Official Documentation

Project structure

/proj
-__init__
-            # Instantiate the celery object
-        Configuration files for #celery
-           #celeryPreparation of mission documents


#coding:utf-8
from __future__ import absolute_import
from celery import Celery

app = Celery('proj', include=[''])   # Instantiate the celery object

app.config_from_object('')   # Introducing configuration files

if __name__ == '__main__':            
  ()
  • The proj parameter is the name of the celery.
  • The include parameter is a list of modules to be imported at startup.


#coding:utf-8
from __future__ import absolute_import

from  import app
@()
def add(x, y):
  return x + y


#coding:utf-8
from kombu import Queue

BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//' # Use RabbitMQ as a message broker


CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # Stored the results of the mission in Redis

CELERY_TASK_SERIALIZER = 'msgpack' # Task serialization and deserialization using msgpack scheme

CELERY_RESULT_SERIALIZER = 'json' # Reading the results of a task generally has low performance requirements, so better readable JSON is used

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # Task expiration time, it is not recommended to write 86400 directly, you should make such a magic number expression more obvious

CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # Specify the type of content to be accepted

CELERY_QUEUES = (  # Set up add queue, bind routing_key
  Queue('add', routing_key=''),
)


CELERY_ROUTES = {  #This task goes into the add queue and routing_key is
  '': { 
    'queue': 'add',
    'routing_key': '',
  }
}
  • CELERY_ACCEPT_CONTENT type msgpack for is a smaller and faster than the json type, if you use need to install the corresponding package.
  • CELERY_QUEUES sets a queue with the specified routing_key, which can be named arbitrarily.
  • CELERY_ROUTES sets up the routing, specifying the corresponding queue and routing_key for the specified task name, note that the routing_key here needs to be the same as the one in the parameter above.

activate (a plan)

In the upper directory of proj, type

celery -A  worker -Q add -l info

is the name of the task, which is the one set in CELERY_ROUTES

add is the queue set, key= is the routing_key set

Publishing tasks

from  import add
(2,3)

What needs to be changed in multiple queues

CELERY_QUEUES = (  # Set up add queue, bind routing_key
  Queue('add', routing_key=''),
)


CELERY_ROUTES = {  #This task goes into the add queue and routing_key as
  '': { 
    'queue': 'add',
    'routing_key': '',
  }

Configure two queues

# Configure the queue
CELERY_QUEUES = (
  Queue('default', routing_key='default'),
  Queue('Queue 1', routing_key='key1'),
  Queue('Queue 2', routing_key='key2'),
)
# Routing (which task goes into which queue)
CELERY_ROUTES = {
  'Task 1': {'queue': 'Queue 1', 'routing_key': 'key1'},
  'Task 2': {'queue': 'Column 2', 'routing_key': 'key2'},
}

This is the whole content of this article.