set in motion
Celery is a simple , flexible and reliable , distributed system for processing large numbers of messages , and provides the necessary tools to maintain such a system . It is a task queue focused on real-time processing , but also supports task scheduling .
The mode of operation is the producer-consumer mode:
Task Queue: A task queue is a mechanism for distributing tasks between threads or machines.
Message Queue: The input to the message queue is a unit of work called a task, and an independent job (Worker) process continuously monitors the queue for new tasks that need to be processed.
Celery communicates with messages, usually using a broker to mediate between the client and the job. The process starts with the client adding a message to a queue, after which the broker dispatches the message to the job, which processes it.
Celery's architecture consists of three parts, message middleware (message broker), task execution unit (worker) and task execution result store (task result store).
Messaging middleware: Celery itself does not provide messaging services, but can be easily integrated with third-party messaging middleware, including RabbitMQ, Redis, MongoDB, etc. In this article, we use redis.
Task execution unit: Worker is the unit of task execution provided by Celery, and workers run concurrently in distributed system nodes.
Task result store: Task result store is used to store the results of the tasks executed by the Worker , Celery supports different ways to store the results of the task , including Redis, MongoDB, Django ORM, AMQP , etc., here I do not go to look at how it is stored , it is the first to choose Redis to store the results of the execution of the task .
mounting
It can be installed with the pip command:
pip install celery
This article uses redis as a messaging middleware, so it needs to be installed:
pip install redis
The redis software should also be installed, and the official website only provides a linux version for download.:/download,windows You can go to/MicrosoftArchive/redis Download the exe installer.
Simple demos
To run a simple task that illustrates the use of celery. Create and in the project folder. to define the task:
# import time from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('my_tasks', broker=broker, backend=backend) @ def add(x, y): print('enter task') (3) return x + y
These codes do something. The broker specifies the message middleware for the task queue, and the backend specifies the storage for the results of the task execution. app is the Celery object we created. The modifier turns the add function into a task.
# from tasks import add if __name__ == '__main__': print('start task') result = (2, 18) print('end task') print(result)
function serializes the task and sends it to the message middleware. A terminal executing python can see the output uniquely identifying a task:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
This just pushes the task to redis, the task hasn't been consumed yet, the task will be in the celery queue.
Enable celery woker to consume the task:
celery worker -A tasks -l info # -A followed by the module name
The A parameter specifies the location of the celery object, and the l parameter specifies the woker's logging level.
If this command reports an error in the terminal:
File "e:\workspace\.env\lib\site-packages\celery\app\", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
This is win 10 when using Celery will have this problem, the solution can be to change to the Celery version, or as provided on Unable to run tasks under Windows, the issue provides two ways to solve the problem, one is to install the eventlet extension:
pip install eventlet celery -A <mymodule> worker -l info -P eventlet
Another way is to add an environment variable with FORKED_BY_MULTIPROCESSING = 1 (this is recommended):
import os ('FORKED_BY_MULTIPROCESSING', '1')
If all goes well and woker starts up properly, you will be able to see the task being consumed in the terminal:
[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: [745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task [745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
It means that our demo has been successful.
Using Configuration Files
In the above demo, the broker and backend are written directly in the code, while Celery has other configurations that are best written out in the form of a configuration file, the basic configuration items are:
- CELERY_DEFAULT_QUEUE: default queue
- BROKER_URL : the web address of the agent
- CELERY_RESULT_BACKEND: result storage address
- CELERY_TASK_SERIALIZER: task serialization method
- CELERY_RESULT_SERIALIZER: Task execution result serialization method
- CELERY_TASK_RESULT_EXPIRES: task expiration time
- CELERY_ACCEPT_CONTENT: Specifies the type of content serialization (serialization) accepted by the task, a list;
Organize the directory structure and wrap our tasks into packages:
The content is as follows:
# __init__.py import os from celery import Celery ('FORKED_BY_MULTIPROCESSING', '1') app = Celery('demo') # Load configuration modules via Celery instances app.config_from_object('celery_app.celery_config') # celery_config.py BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' # UTC CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai' # Import the specified task module CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2', ) # import time from celery_app import app @ def add(x, y): print('enter task') (3) return x + y # import time from celery_app import app @ def mul(x, y): print('enter task') (4) return x * y # from celery_app import task1 if __name__ == '__main__': pass print('start task') result = (2, 18) print('end task') print(result)
Submitting a task vs. starting a worker:
$ python $ celery worker -A celery_app -l info
result = (2, 18) returns a task object, which can be found to be non-blocking by way of the delay function, which has a method:
() # Check the status of the task, return a boolean value, if the task is completed, return True, otherwise return False. () # Wait for the task to complete, return the result of the task execution, rarely used; (timeout=1) # Get the result of the task execution, you can set the waiting time # Results of mandate implementation. # PENDING, START, SUCCESS, the current status of the task # PENDING, START, SUCCESS, the current status of the task # Return true if the task was successful # If the task throws an exception,You can also get the original backtracking information
timed task
Scheduled tasks are similar to crontab in that they can be used to accomplish daily statistics, etc. First we need to configure schedule. First we need to configure schedule, by transforming the configuration file above and adding the CELERYBEAT_SCHEDULE configuration:
import datetime from import crontab CELERYBEAT_SCHEDULE = { 'task1-every-1-min': { 'task': 'celery_app.', 'schedule': (seconds=60), 'args': (2, 15), }, 'task2-once-a-day': { 'task': 'celery_app.', 'schedule': crontab(hour=15, minute=23), 'args': (3, 6), } }
task specifies the task to be executed; schedule indicates the time of the schedule, (seconds=60) indicates a one-minute interval, which can actually be crontab(minute='*/1') to replace; args indicates the parameters to be passed.
Start celery beat.
$ celery worker -A celery_app -l info
We are currently using two windows to run woker and beat. Of course it is also possible to run it using only one window (linux only):
$ celery -B -A celery_app worker -l info
decorator
@() def name(): pass
The task() method modifies the task to be asynchronous. name displays the name of the specified task; serializer specifies the serialization method; bind is a bool value; if True, the task instance will be passed as the first parameter to the task method, and all the attributes of the task instance, i.e., the ones in the previous deserialization, can be accessed.
@task(bind=True) # The first argument is self, using the access-related attribute def add(self, x, y): ()
base can specify the task accumulation and can be used to define callback functions:
import celery class MyTask(): # Executed on mission failure def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) # Executed on mission success def on_success(self, retval, task_id, args, kwargs): pass # Executed on task retries def on_retry(self, exc, task_id, args, kwargs, einfo): pass @task(base=MyTask) def add(x, y): raise KeyError() exc:Types of errors when failing; task_id:mandatedid; args:Parameters of the task function; kwargs:parameters; einfo:Exception details in case of failure; retval:Return value for successful task execution;
summarize
Online to find a more commonly used configuration file, you can refer to when needed:
# Note that CELERY_BROKER_URL was changed to BROKER_URL after celery version 4 BROKER_URL = 'amqp://username:passwd@host:port/virtual hostname' # Specify the recipient address of the result CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db' # Specify the task serialization method CELERY_TASK_SERIALIZER = 'msgpack' # Specify how the results are serialized CELERY_RESULT_SERIALIZER = 'msgpack' # Task expiration time, timeout for celery task execution results CELERY_TASK_RESULT_EXPIRES = 60 * 20 # Specify the type of serialization accepted by the task. CELERY_ACCEPT_CONTENT = ["msgpack"] # Whether or not confirmation of task delivery completion is required, this item has a small impact on performance CELERY_ACKS_LATE = True # Compression scheme selection, can be zlib, bzip2, default is to send uncompressed data CELERY_MESSAGE_COMPRESSION = 'zlib' # Setting time frames for completion of tasks CELERYD_TASK_TIME_LIMIT = 5 # Complete the task within 5s, otherwise the worker executing the task is killed and the task is handed over to the parent process # celery worker concurrency, the default is the number of cores on the server, also the number specified by the -c parameter on the command line. CELERYD_CONCURRENCY = 4 # celery worker prefetching tasks per visit to rabbitmq CELERYD_PREFETCH_MULTIPLIER = 4 # How many tasks each worker performs before it dies, default is infinite CELERYD_MAX_TASKS_PER_CHILD = 40 # This uses the default database scheduling model of django-celery, where the task execution cycles are stored in the orm database you specify. # CELERYBEAT_SCHEDULER = '' # Set the default queue name, if a message does not fit into the other queues it will be placed in the default queue, if nothing is set, the data will be sent to the default queue CELERY_DEFAULT_QUEUE = "default" # Set up detailed queues CELERY_QUEUES = { "default": { # This is the default queue specified above "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # It's a topic queue # # Any routing key that starts with toptictest will be put into this queue # "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # Set up sector switches "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", }, }
This is the whole content of this article.