Use Beanstalkd as a message queuing service, then combine it with Python's decorator syntax to implement a simple asynchronous task processing tool.
final result
Define the task.
from xxxxx.job_queue import JobQueue queue = JobQueue() @('task_tube_one') def task_one(arg1, arg2, arg3): # do task
Submission of assignments.
task_one.put(arg1="a", arg2="b", arg3="c")
The tasks can then be performed by the background work thread.
implementation process
1. Understanding Beanstalk Server
Beanstalk is a simple, fast work queue. /kr/beanstalkd
Beanstalk is a message queuing service implemented in the C language. It provides a generic interface and was originally designed to reduce page latency in a large number of Web applications by running time-consuming tasks asynchronously. There are different implementations of Beanstalkd Client for different languages. In Python, there's beanstalkc, for example. I'm using beanstalkc as a tool to communicate with the beanstalkd server.
2、Task asynchronous implementation principle
beanstalkd can only do task scheduling for strings. In order for the program to support submitting functions and arguments, and then having the woker execute the function with the arguments. An intermediate layer is needed to register the function with the passed arguments.
The realization consists of 3 main parts.
Subscriber: responsible for registering a function to a beanstalk tube, the implementation is simple, register the correspondence between the function name and the function itself. (This means that no function can have the same name in the same tube). The data is stored in class variables.
class Subscriber(object): FUN_MAP = defaultdict(dict) def __init__(self, func, tube): ('register func:{} to tube:{}.'.format(func.__name__, tube)) Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue: facilitates the conversion of a normal function into a Putter-capable decorator.
class JobQueue(object): @classmethod def task(cls, tube): def wrapper(func): Subscriber(func, tube) return Putter(func, tube) return wrapper
Putter: Combine function name, function parameters, and specified grouping into an object, then json serialize it to a string, and finally push it to beanstalkd queue via beanstalkc.
class Putter(object): def __init__(self, func, tube): = func = tube # Direct call returns def __call__(self, *args, **kwargs): return (*args, **kwargs) # Push to offline queue def put(self, **kwargs): args = { 'func_name': .__name__, 'tube': , 'kwargs': kwargs } ('put job:{} to queue'.format(args)) beanstalk = (host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) try: () job_id = ((args)) return job_id finally: ()
Worker: take the string from beanstalkd queue, then deserialize it to an object, get the function name, parameters and tube, and finally get the function code from Subscriber, then pass the parameters to execute the function.
class Worker(object): worker_id = 0 def __init__(self, tubes): = (host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) = tubes self.reserve_timeout = 20 self.timeout_limit = 1000 self.kick_period = 600 self.signal_shutdown = False self.release_delay = 0 = 0 self.signal_shutdown = False (, lambda signum, frame: self.graceful_shutdown()) Worker.worker_id += 1 import_module_by_str('.controller_crawler') def subscribe(self): if isinstance(, list): for tube in : if tube not in Subscriber.FUN_MAP.keys(): ('tube:{} not register!'.format(tube)) continue (tube) else: if not in Subscriber.FUN_MAP.keys(): ('tube:{} not register!'.format()) return () def run(self): () while True: if self.signal_shutdown: break if self.signal_shutdown: ("graceful shutdown") break job = (timeout=self.reserve_timeout) # Obstruct the fetch task, maximum wait timeout if not job: continue try: self.on_job(job) self.delete_job(job) except as e: (e, exc_info=1) except Exception as e: (e) kicks = ()['kicks'] if kicks < 3: self.bury_job(job) else: message = () ("Kicks reach max. Delete the job", extra={'body': message}) self.delete_job(job) @classmethod def on_job(cls, job): start = () msg = () (msg) tube = ('tube') func_name = ('func_name') try: func = Subscriber.FUN_MAP[tube][func_name] kwargs = ('kwargs') func(**kwargs) (u'{}-{}'.format(func, kwargs)) except Exception as e: (, exc_info=True) cost = () - start ('{} cost {}s'.format(func_name, cost)) @classmethod def delete_job(cls, job): try: () except as e: (e, exc_info=1) @classmethod def bury_job(cls, job): try: () except as e: (e, exc_info=1) def graceful_shutdown(self): self.signal_shutdown = True
While writing the above code, a problem was discovered:
The correspondence between the function name and the function itself is registered through Subscriber, which is run in a Python interpreter, i.e., in a process, and the worker is run asynchronously in another process, so how can the worker get the same Subscriber as the Putter. finally, it was found that the Python decorator mechanism It was finally realized that Python's decorator mechanism could solve this problem.
That's the one that solves the Subscriber problem.
import_module_by_str('.controller_crawler')
# import_module_by_str implementation def import_module_by_str(module_name): if isinstance(module_name, unicode): module_name = str(module_name) __import__(module_name)
When import_module_by_str is executed, __import__ is called to load classes and functions dynamically. After loading into memory the module that hosts the function that uses JobQueue, the Python interpreter executes the @-modified decorator code first. When you run Woker, the Python interpreter executes the @-modified decorator code first, which loads the correspondence in Subscriber into memory.
Practical use can be seen/jiyangg/Pear/blob/master/pear/jobs/job_queue.py
This is the whole content of this article.