SoFunction
Updated on 2024-11-16

Python's approach to asynchronous task processing with Beanstalkd

Use Beanstalkd as a message queuing service, then combine it with Python's decorator syntax to implement a simple asynchronous task processing tool.

final result

Define the task.

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task

Submission of assignments.

task_one.put(arg1="a", arg2="b", arg3="c")

The tasks can then be performed by the background work thread.

implementation process

1. Understanding Beanstalk Server

Beanstalk is a simple, fast work queue. /kr/beanstalkd

Beanstalk is a message queuing service implemented in the C language. It provides a generic interface and was originally designed to reduce page latency in a large number of Web applications by running time-consuming tasks asynchronously. There are different implementations of Beanstalkd Client for different languages. In Python, there's beanstalkc, for example. I'm using beanstalkc as a tool to communicate with the beanstalkd server.

2、Task asynchronous implementation principle

beanstalkd can only do task scheduling for strings. In order for the program to support submitting functions and arguments, and then having the woker execute the function with the arguments. An intermediate layer is needed to register the function with the passed arguments.

The realization consists of 3 main parts.

Subscriber: responsible for registering a function to a beanstalk tube, the implementation is simple, register the correspondence between the function name and the function itself. (This means that no function can have the same name in the same tube). The data is stored in class variables.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  ('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue: facilitates the conversion of a normal function into a Putter-capable decorator.

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper

Putter: Combine function name, function parameters, and specified grouping into an object, then json serialize it to a string, and finally push it to beanstalkd queue via beanstalkc.

class Putter(object):
 def __init__(self, func, tube):
   = func
   = tube

 # Direct call returns
 def __call__(self, *args, **kwargs):
  return (*args, **kwargs)

 # Push to offline queue
 def put(self, **kwargs):
  args = {
   'func_name': .__name__,
   'tube': ,
   'kwargs': kwargs
  }
  ('put job:{} to queue'.format(args))
  beanstalk = (host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   ()
   job_id = ((args))
   return job_id
  finally:
   ()

Worker: take the string from beanstalkd queue, then deserialize it to an object, get the function name, parameters and tube, and finally get the function code from Subscriber, then pass the parameters to execute the function.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
   = (host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
   = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
   = 0
  self.signal_shutdown = False
  (, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('.controller_crawler')

 def subscribe(self):
  if isinstance(, list):
   for tube in :
    if tube not in Subscriber.FUN_MAP.keys():
     ('tube:{} not register!'.format(tube))
     continue
    (tube)
  else:
   if  not in Subscriber.FUN_MAP.keys():
    ('tube:{} not register!'.format())
    return
   ()

 def run(self):
  ()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    ("graceful shutdown")
    break
   job = (timeout=self.reserve_timeout) # Obstruct the fetch task, maximum wait timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except  as e:
    (e, exc_info=1)
   except Exception as e:
    (e)
    kicks = ()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = ()
     ("Kicks reach max. Delete the job", extra={'body': message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = ()
  msg = ()
  (msg)
  tube = ('tube')
  func_name = ('func_name')
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = ('kwargs')
   func(**kwargs)
   (u'{}-{}'.format(func, kwargs))
  except Exception as e:
   (, exc_info=True)
  cost = () - start
  ('{} cost {}s'.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   ()
  except  as e:
   (e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   ()
  except  as e:
   (e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True

While writing the above code, a problem was discovered:

The correspondence between the function name and the function itself is registered through Subscriber, which is run in a Python interpreter, i.e., in a process, and the worker is run asynchronously in another process, so how can the worker get the same Subscriber as the Putter. finally, it was found that the Python decorator mechanism It was finally realized that Python's decorator mechanism could solve this problem.

That's the one that solves the Subscriber problem.

import_module_by_str('.controller_crawler')
# import_module_by_str implementation
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)

When import_module_by_str is executed, __import__ is called to load classes and functions dynamically. After loading into memory the module that hosts the function that uses JobQueue, the Python interpreter executes the @-modified decorator code first. When you run Woker, the Python interpreter executes the @-modified decorator code first, which loads the correspondence in Subscriber into memory.

Practical use can be seen/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

This is the whole content of this article.