SoFunction
Updated on 2024-11-13

Simple use of tornado+celery in detail

celery is a good choice for implementing a simple, flexible and reliable distributed task queuing system

tornado needs little introduction

Installing rabbitmq on the development machine is not covered here

First the task file is written

#coding=utf-8
from celery import Celery
from  import worker as celery_worker
import celeryconfig

broker = 'amqp://'
backend = 'amqp'
app = Celery('celery_test', backend=backend, broker=broker)
app.config_from_object(celeryconfig)

@
def mytask0(task_name):
  print "task0:%s" %task_name
  return task_name 

@
def mytask1(task_name):
  print "task1:%s" %task_name
  return task_name 

def worker_start():
  worker = celery_worker.worker(app=app)
  (broker=broker, concurrency=4,
        traceback=False, loglevel='INFO')

if __name__ == "__main__":
  worker_start()

The file contains the configuration for celery

#coding=utf-8
from kombu import Queue
CELERY_DEFAULT_QUEUE = 'mytask0'
CELERY_QUEUES = (
  Queue('mytask0',  routing_key='task.mytask0'),
  Queue('mytask1',  routing_key='task.mytask1'),
)
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'task.mytask0'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = {
  'task.mytask0': {
    'queue': 'mytask0',
    'routing_key': 'task.mytask0',
  },
  'task.mytask1': {
    'queue': 'mytask1',
    'routing_key': 'task.mytask1',
  },
}

Executing python will start the worker.

tornado calls celery to make a blocking task non-blocking

This uses the tcelery module, a non-blocking broker implementation of tornado.

#coding=utf-8
from tornado import web
import task

class TestHandler():

  @
  def get(self):
    task.mytask0.apply_async(
      args=['task0'],
         queue='mytask0',
         routing_key='task.mytask0',
         callback=self.on_success)
  def on_success(self, result):
    ({'task':})

Used to implement the startup of tornado services

#coding=utf-8
import tornado
from  import define, options, parse_command_line
from  import enable_pretty_logging
import tcelery
from app import TestHandler
import 

define("port", default=8000, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")

urls = [(r"/api/task/test", TestHandler)]

def server_start():
  app = (urls, debug=)
  enable_pretty_logging()
  parse_command_line()
  server = (app)
  ()
  (2)
  tcelery.setup_nonblocking_producer(limit=2)
  ().start()

if __name__ == "__main__":
  server_start()

Execute python to start the service

Above this tornado + celery simple use of details is all that I share with you, I hope to give you a reference, and I hope you support me more.