SoFunction
Updated on 2024-12-10

Python multiprocessing multiprocessing usage example analysis

This article example describes Python multiprocessing multiprocessing usage. Shared for your reference, as follows:

Introduction to mutilprocess

Manage processes like threads, this is the core of mutilprocess, he is very similar to threading, the utilization of multi-core CPU will be much better than threading.

Simple creation process:

import multiprocessing
def worker(num):
  """thread worker function"""
  print 'Worker:', num
  return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = (target=worker, args=(i,))
    (p)
    ()

Determine the current process, i.e., name the process for easy identification and tracking.

import multiprocessing
import time
def worker():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  (2)
  print name, 'Exiting'
def my_service():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  (3)
  print name, 'Exiting'
if __name__ == '__main__':
  service = (name='my_service',
                   target=my_service)
  worker_1 = (name='worker 1',
                    target=worker)
  worker_2 = (target=worker) # default name
  worker_1.start()
  worker_2.start()
  ()

daemon is not to block the main program to exit, they do their own (True) on this waiting for the daemon to exit, to add join, join can be passed into the floating-point value, wait for a long time on the not waiting for the

Guardian Process:

import multiprocessing
import time
import sys
def daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  (2)
  print 'Exiting :', name
def non_daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  print 'Exiting :', name
if __name__ == '__main__':
  d = (name='daemon',
                target=daemon)
   = True
  n = (name='non-daemon',
                target=non_daemon)
   = False
  ()
  ()
  (1)
  print 'd.is_alive()', d.is_alive()
  ()

It's better to use poison pill, and mandatory to use terminate() Note that terminate is followed by join so that it can update its state.

Terminate the process:

import multiprocessing
import time
def slow_worker():
  print 'Starting worker'
  (0.1)
  print 'Finished worker'
if __name__ == '__main__':
  p = (target=slow_worker)
  print 'BEFORE:', p, p.is_alive()
  ()
  print 'DURING:', p, p.is_alive()
  ()
  print 'TERMINATED:', p, p.is_alive()
  ()
  print 'JOINED:', p, p.is_alive()

①. == 0 No error generated
②. 0 The process had an error and exited with that error code
③. < 0 Process is terminated by a -1 * exitcode signal

The exit status of the process:

import multiprocessing
import sys
import time
def exit_error():
  (1)
def exit_ok():
  return
def return_value():
  return 1
def raises():
  raise RuntimeError('There was an error!')
def terminated():
  (3)
if __name__ == '__main__':
  jobs = []
  for f in [exit_error, exit_ok, return_value, raises, terminated]:
    print 'Starting process for', f.func_name
    j = (target=f, name=f.func_name)
    (j)
    ()
  jobs[-1].terminate()
  for j in jobs:
    ()
    print '% = %s' % (, )

Convenient debugging can be done with logging

Log:

import multiprocessing
import logging
import sys
def worker():
  print 'Doing some work'
  ()
if __name__ == '__main__':
  multiprocessing.log_to_stderr()
  logger = multiprocessing.get_logger()
  ()
  p = (target=worker)
  ()
  ()

Use class to create processes, customize subclasses

Derivation process:

import multiprocessing
class Worker():
  def run(self):
    print 'In %s' % 
    return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = Worker()
    (p)
    ()
  for j in jobs:
    ()

Python inter-process message passing:

import multiprocessing
class MyFancyClass(object):
  def __init__(self, name):
     = name
  def do_something(self):
    proc_name = multiprocessing.current_process().name
    print 'Doing something fancy in %s for %s!' % \
      (proc_name, )
def worker(q):
  obj = ()
  obj.do_something()
if __name__ == '__main__':
  queue = ()
  p = (target=worker, args=(queue,))
  ()
  (MyFancyClass('Fancy Dan'))
  # Wait for the worker to finish
  ()
  queue.join_thread()
  ()
import multiprocessing
import time
class Consumer():
  def __init__(self, task_queue, result_queue):
    .__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue
  def run(self):
    proc_name = 
    while True:
      next_task = self.task_queue.get()
      if next_task is None:
        # Poison pill means shutdown
        print '%s: Exiting' % proc_name
        self.task_queue.task_done()
        break
      print '%s: %s' % (proc_name, next_task)
      answer = next_task()
      self.task_queue.task_done()
      self.result_queue.put(answer)
    return
class Task(object):
  def __init__(self, a, b):
     = a
     = b
  def __call__(self):
    (0.1) # pretend to take some time to do the work
    return '%s * %s = %s' % (, ,  * )
  def __str__(self):
    return '%s * %s' % (, )
if __name__ == '__main__':
  # Establish communication queues
  tasks = ()
  results = ()
  # Start consumers
  num_consumers = multiprocessing.cpu_count() * 2
  print 'Creating %d consumers' % num_consumers
  consumers = [ Consumer(tasks, results)
         for i in xrange(num_consumers) ]
  for w in consumers:
    ()
  # Enqueue jobs
  num_jobs = 10
  for i in xrange(num_jobs):
    (Task(i, i))
  # Add a poison pill for each consumer
  for i in xrange(num_consumers):
    (None)
  # Wait for all of the tasks to finish
  ()
  # Start printing results
  while num_jobs:
    result = ()
    print 'Result:', result
    num_jobs -= 1

Event provides a simple way to pass state information between processes. Events can toggle between set and unset state. By using an optional timeout value, the user of a time object can wait for its state to change from unset to set.

Inter-process signaling:

import multiprocessing
import time
def wait_for_event(e):
  """Wait for the event to be set before doing anything"""
  print 'wait_for_event: starting'
  ()
  print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
  """Wait t seconds and then timeout"""
  print 'wait_for_event_timeout: starting'
  (t)
  print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
  e = ()
  w1 = (name='block', 
                 target=wait_for_event,
                 args=(e,))
  ()
  w2 = (name='nonblock', 
                 target=wait_for_event_timeout, 
                 args=(e, 2))
  ()
  print 'main: waiting before calling ()'
  (3)
  ()
  print 'main: event is set'

Python multiprocessing, the usual case is Queue to pass.

Queue:

from multiprocessing import Process, Queue
def f(q):
  ([42, None, 'hello'])
if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  ()
  print ()  # prints "[42, None, 'hello']"
  ()

Multi-threaded priority queue Queue:

import Queue
import threading
import time
exitFlag = 0
class myThread ():
  def __init__(self, threadID, name, q):
    .__init__(self)
     = threadID
     = name
     = q
  def run(self):
    print "Starting " + 
    process_data(, )
    print "Exiting " + 
def process_data(threadName, q):
  while not exitFlag:
    ()
    if not ():
      data = ()
      ()
      print "%s processing %s" % (threadName, data)
    else:
      ()
    (1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = ()
workQueue = (10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
  thread = myThread(threadID, tName, workQueue)
  ()
  (thread)
  threadID += 1
# Fill the queue
()
for word in nameList:
  (word)
()
# Wait for queue to empty
while not ():
  pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
  ()
print "Exiting Main Thread"

Example of multi-process communication using Queue

import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
  while True:
    if () > 0:
      print ('queue is empty %d' % (()))
    else:
      msg = ()
      print( 'get msg %s' % (msg,))
    (1)
def startB(msgQueue):
  while True:
    ('hello world')
    print( 'put hello world queue size is %d' % ((),))
    (3)
if __name__ == '__main__':
  processA = Process(target=startA,args=(MSG_QUEUE,))
  processB = Process(target=startB,args=(MSG_QUEUE,))
  ()
  print( 'processA start..')

The master process defines a variable of type Queue and passes it to the child processes processA and processB as the args parameter of Process, and one of the two processes writes data to the queue and the other reads data.

Readers interested in more Python related content can check out this site's topic: theSummary of Python process and thread manipulation techniques》、《Python Socket Programming Tips Summary》、《Python Data Structures and Algorithms Tutorial》、《Summary of Python function usage tips》、《Summary of Python string manipulation techniques》、《Python introductory and advanced classic tutorialsand theSummary of Python file and directory manipulation techniques

I hope that what I have said in this article will help you in Python programming.