SoFunction
Updated on 2024-11-10

Explaining python distributed processes in detail

Among Thread and Process, Process should be preferred because Process is more stable and, moreover, Process can be distributed to multiple machines, whereas Thread can only be distributed to at most multiple CPUs on the same machine.

Python's multiprocessing module not only supports multiprocessing, but the managers submodule supports distributing multiple processes to multiple machines. A service process can act as a scheduler and distribute tasks to multiple other processes, relying on network communication. Since the managers module is well encapsulated, it is easy to write distributed multiprocessing programs without having to understand the details of network communication.

As an example: if we already have a multi-process program running on the same machine communicating via Queue, now, due to the heavy task of the process that handles the task, we would like to distribute the process that sends the task and the one that handles the task to two machines. How can this be achieved with distributed processes?

The original Queue can continue to be used, but by exposing the Queue over the network through the managers module, it is possible for processes on other machines to access the Queue.

Let's look at the service process, which is responsible for starting the Queue, registering the Queue with the network, and then writing tasks to the Queue:

# task_master.py
 
import random, time, queue
from  import BaseManager
# Queues for sending tasks.
task_queue = ()
# Queue for receiving results.
result_queue = ()
# QueueManager inherited from BaseManager.
class QueueManager(BaseManager):
  pass
# Register both Queues to the network, the callable parameter is associated with the Queue object.
('get_task_queue', callable=lambda: task_queue)
('get_result_queue', callable=lambda: result_queue)
# Bind port 5000, set captcha 'abc'.
manager = QueueManager(address=('', 5000), authkey=b'abc')
# Launch Queue.
()
# Get the Queue object accessed over the network.
task = manager.get_task_queue()
result = manager.get_result_queue()
# Put a few tasks in: #
for i in range(10):
  n = (0, 10000)
  print('Put task %d...' % n)
  (n)
# Read results from the results queue.
print('Try get results...')
for i in range(10):
  r = (timeout=10)
  print('Result: %s' % r)
# Close.
()

Note that when we write a multiprocess program on a machine, the Queue created can be used directly, however, in a distributed multiprocess environment, adding tasks to a Queue cannot be done directly on the original task_queue, that would bypass the QueueManager encapsulation, and must be added via the Queue interface obtained by manager.get_task_ queue() Queue interface must be added through manager.get_task_queue().

Then, start the task process on another machine (Starting locally is also possible):

# task_master.py
import random, time, queue
from  import BaseManager
# Queues for sending tasks.
task_queue = ()
# Queue for receiving results.
result_queue = ()
# QueueManager inherited from BaseManager.
class QueueManager(BaseManager):
  pass
# Register both Queues to the network, the callable parameter is associated with the Queue object.
('get_task_queue', callable=lambda: task_queue)
('get_result_queue', callable=lambda: result_queue)
# Bind port 5000, set captcha 'abc'.
manager = QueueManager(address=('', 5000), authkey=b'abc')
# Launch Queue.
()
# Get the Queue object accessed over the network.
task = manager.get_task_queue()
result = manager.get_result_queue()
# Put a few tasks in: #
for i in range(10):
  n = (0, 10000)
  print('Put task %d...' % n)
  (n)
# Read results from the results queue.
print('Try get results...')
for i in range(10):
  r = (timeout=10)
  print('Result: %s' % r)
# Close.
()

The task process has to connect to the service process over the network, so specify the IP of the service process.

Now it's time to try out how well distributed processes work. Start the task_master.py service process first:

$ python3 task_master.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

After the task_master.py process sends the task, it starts waiting for the result of the RESULT queue. Now start the task_worker.py process:

$ python3 task_worker.pyConnect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

The task_worker.py process ends, and the results continue to be printed out in the task_master.py process:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

What's the point of this simple Master/Worker model? In fact, this is a simple but truly distributed computing, the code slightly modified, start multiple workers, you can distribute the task to several or even dozens of machines, such as the calculation of n * n code replaced by sending mail, the mail queue to achieve asynchronous sending.

And what makes Queue accessible over the network is that it is implemented through QueueManager. Since the QueueManager manages more than one Queue, it is important to name the network call interface for each Queue, such as get_task_queue.

What is the purpose of authkey? This is to ensure that the two machines communicate properly and are not maliciously interfered by other machines. If the authkey of task_worker.py does not match the authkey of task_master.py, it will definitely not connect.

Python's Distributed Processes interface is simple, well encapsulated, and suitable for environments where heavy tasks need to be distributed to multiple machines.

Note that the Queue is meant to be used to pass tasks and receive results, and the amount of data describing each task should be as small as possible. For example, if you send a task to process a log file, instead of sending several hundred megabytes of the log file itself, you send the full path to where the log file is stored, and the Worker process then goes to the shared disk to read the file.

Above is all that this post is about, this post is about python distributed processes, I hope you can take the help of the information so that you can understand what is said above. I hope what I have told in this article will help you and make your learning of python easier.