SoFunction
Updated on 2024-11-20

One article to take care of Python multiprocessing

multiprocessing module

Python's multiprocessing is realized through the multiprocessing package, and multithreading is similar, it can use the object to create a process object. The methods of this process object and the methods of the thread object are almost the same as those of the thread object, such as start(), run(), join(), etc. One of the methods is different from the guardian thread method of the Thread thread object, which is setDeamon, whereas the guardian process of the Process process object is accomplished by setting the daemon attribute.

Here's how Python multiprocessing is implemented, similar to multithreading

Multi-process realization method I

from multiprocessing import  Process

def fun1(name):
    print('Test %s multiprocessing' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  # Turn on 5 child processes to execute the fun1 function
        p = Process(target=fun1,args=('Python',)) # Instantiate process objects
        ()
        process_list.append(p)

    for i in process_list:
        ()

    print('End Test')

in the end

Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
End of test
Process finished with exit code 0

The above code opens 5 sub-processes to execute the function, we can observe the result, it is printed at the same time, here realizes the real parallel operation, that is, more than one CPU executes the task at the same time. We know that the process is the smallest resource allocation unit in python, that is, the data in the middle of the process, the memory is not shared, each start a process, it is necessary to independently allocate resources and copy the data accessed, so the process of starting and destroying the cost of the process is relatively large, so in practice the use of multi-processing, according to the configuration of the server to set up.

Multi-process realization methodology II

Remember the second implementation of python multithreading? It's realized through class inheritance, and the second implementation of python multiprocessing is the same way

from multiprocessing import  Process

class MyProcess(Process): # Inherit the Process class
    def __init__(self,name):
        super(MyProcess,self).__init__()
         = name

    def run(self):
        print('Test %s multiprocessing' % )


if __name__ == '__main__':
    process_list = []
    for i in range(5):  # Turn on 5 child processes to execute the fun1 function
        p = MyProcess('Python') # Instantiate process objects
        ()
        process_list.append(p)

    for i in process_list:
        ()

    print('End Test')

in the end

Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
Testing Python multiprocessing
End of test
Process finished with exit code 0

The effect is the same as the first way.

We can see that Python multiprocessing is implemented in almost the same way as multithreading.

Other methods of the Process class

Construction Methods:

Process([group [, target [, name [, args [, kwargs]]]]])

group: thread group

target: the method to be executed

name: Process name

args/kwargs: arguments to be passed into the method

Instance Methods:

is_alive(): return whether the process is running or not,bool type.

join([timeout]): block the process program of the current context environment until the process calling this method terminates or reaches the specified timeout (optional parameter).

start(): the process is ready, waiting for the CPU scheduling

run(): strat() calls the run method, if the instance process is not formulated to pass a target, this star executes t default run() method.

terminate(): regardless of whether the task is completed or not, immediately stop the work process

Properties:

daemon: same function as thread setDeamon

name: process name

pid: process number

The use of join, daemon is the same as python multithreading, so I won't rehash it here.

Multi-threaded communication

Processes are the basic unit of system independent scheduling nuclear allocation of system resources (CPU, memory), processes are independent of each other, each start a new process is equivalent to the data for a clone, the data changes in the sub-processes can not affect the data in the main process, the data between different sub-processes can not be shared, which is the use of multi-processes in the use of multi-threaded the most obvious difference. But is Python multiprocessing isolated? Of course not, python also provides a variety of methods to realize the multi-process communication and data sharing (you can modify a copy of the data)

Process Queue

Queue in multi-threaded also said, used in the generator consumer mode, is thread-safe, is the producer and the consumer in the middle of the data pipeline, that in python multiprocessing, it is in fact the data pipeline between the processes, to achieve process communication.

from multiprocessing import Process,Queue


def fun1(q,i):
    print('Child process %s starting put data' %i)
    ('I am %s communicating via Queue' %i)

if __name__ == '__main__':
    q = Queue()

    process_list = []
    for i in range(3):
        p = Process(target=fun1,args=(q,i,))  # Note that the args should pass the q object to the method we want to execute, so that the process can communicate with the master process using Queue
        ()
        process_list.append(p)

    for i in process_list:
        ()

    print('Master process gets Queue data')
    print(())
    print(())
    print(())
    print('End Test')

in the end

Subprocess 0 starts putting data
Subprocess 1 starts putting data
Subprocess 2 starts putting data
Master process gets Queue data
I'm 0. Communicating via Queue.
I'm 1. Communicating via Queue.
I'm two. Communicating via Queue.
End of test
Process finished with exit code 0

The result of the above code can be seen in our main process through the Queue to get the data put in the child process, to realize the inter-process communication.

Pipe

Pipe Pipe and Queue's role is more or less the same, is also the realization of inter-process communication, the following look between how to use it

from multiprocessing import Process, Pipe
def fun1(conn):
    print('Child process sends message:')
    ('Hello Master Process')
    print('The child process accepts the message:')
    print(())
    ()

if __name__ == '__main__':
    conn1, conn2 = Pipe() # Key point, pipe instantiation generates a bi-directional pipe
    p = Process(target=fun1, args=(conn2,)) #conn2 passes to the child process
    ()
    print('Master process accepts message:')
    print(())
    print('Master process sends message:')
    ("Hello sub-process.")
    ()
    print('End Test')

in the end

The master process accepts the message:
The child process sends the message:
The child process accepts the message:
Hello, Master Process.
The master process sends the message:
Hello sub-processes.
End of test
Process finished with exit code 0

Above you can see that master and child processes can send messages to each other

Managers

Queue and Pipe only realize data interaction, but not data sharing, that is, one process to change the data of another process. Then we need to use Managers.

from multiprocessing import Process, Manager

def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    (index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = ()# Note the way dictionaries are declared, they can't be defined directly via {}
        l = (range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            ()
            process_list.append(p)

        for res in process_list:
            ()
        print(dic)
        print(l)

Results:

{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

You can see that the main process defines a dictionary and a list, in the sub-process, you can add and modify the contents of the dictionary, insert new data in the list, to achieve inter-process data sharing, that is, you can work together to modify the same data

5. Process pools

The process pool maintains a sequence of processes, when in use, then go to the pool to get a process, if there is no process available in the pool sequence, then the program will wait until there is a process available in the pool. That is, there are a fixed number of processes that can be used.

There are two methods in the process pool:

  • apply: synchronization, generally not used
  • apply_async: asynchronous
from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, ()))
    start = ()
    (() * 3)
    end = ()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) # Create a pool of 5 processes

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    ()
    ()
    print('End Test')

in the end

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
End of test

Call join() method on the Pool object will wait for all child processes to finish executing, you must call close() before calling join(), you can't continue to add new Processes after calling close().

Process pool map method

Cases from the network, please inform the infringement, thank you!

Because I saw this example online and thought it was good, I won't write my own case here, this one is more convincing

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return ((folder, f) 
            for f in (folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = (filename)
    (SIZE, )
    base, fname = (filename) 
    save_path = (base, SAVE_DIRECTORY, fname)
    (save_path)

if __name__ == \'__main__\':
    folder = (
        \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    ((folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    (creat_thumbnail, images) # Key point, images is an iterable object
    ()
    ()

The main job of the code above is to iterate through the image files in the incoming folder, generate thumbnails one by one, and save these thumbnails to a specific folder. On my machine, it took 27.9 seconds to process 6000 images with this program. The map function does not support manual thread management, which makes debugging it very easy.

map can also be used in the realm of crawlers, such as content crawling of multiple URLs, where you can put the URLs into a meta ancestor and pass it to the executing function.

The above is an article to take you through the details of Python multiprocessing, more information about Python multiprocessing please pay attention to my other related articles!