SoFunction
Updated on 2024-11-18

ThreadPoolExecutor thread pool and ProcessPoolExecutor process pool in python

1, ThreadPoolExecutor multi-threaded

<1> Why do you need a thread pool?

  • For io intensive, improve the efficiency of execution.
  • Threads are created by consuming system resources.

So the idea of a thread pool is:Each thread is assigned a task individually, and the rest of the tasks are queued, and when a thread completes a task, the queued task can be scheduled for that thread to continue execution.

<2> Standard library module

It provides two classes, ThreadPoolExecutor and ProcessPoolExecutor.
Further abstraction of the THREADING module and the MULTIPROESSING module is implemented, respectively.

Not only does it help us automatically schedule threads, but it also does.

  • The main thread can get the state of a thread (or task), as well as a return value
  • When a thread finishes, the main thread can immediately know that the
  • Make multi-threaded and multi-process coding interfaces consistent

<3> simple to use

# -*-coding:utf-8 -*-
from  import ThreadPoolExecutor
import time

# The parameter times is used to simulate network request times
def get_html(times):
    print("get page {}s finished".format(times))
   return times
# Create a thread pool
# Set the maximum number of threads that can run simultaneously in the thread pool, and wait for others
executor = ThreadPoolExecutor(max_workers=2)
# Submit the executed function to the thread pool via the submit function, which returns immediately, without blocking.
# task1 and task2 are task handles
task1 = ( get_html, (2) )
task2 = ( get_html, (3) )

# done() method is used to determine whether a task is completed, bool type, complete return True, not complete return False
print( () )
# cancel() method is used to cancel a task that has not been placed in the thread pool in order to be canceled, and cannot be canceled if it has been placed in the thread pool
# bool type, successfully canceled return True, not canceled return False
print( () )
# result() method can get the result of the task execution, provided get_html() function has a return value
print( () )
print( () )
# Results:
# get page 3s finished
# get page 2s finished
# True
# False

# 2
# 3

When constructing an instance of the ThreadPoolExecutor class, pass in the max_workers parameter to set the maximum number of threads that can run concurrently in the pool.
Use the submit() function to submit a thread's need to perform a task (function name and arguments) to the thread pool and return a handle to that task.

Attention:submit() is not blocking, but returns immediately.

Through the submit() function returns the task handle, you can use the done() method to determine whether the task is finished, use cancel() method to cancel, use the result() method to get the return value of the task, view the internal code, found that the method is blocking

<4> as_completed (get all results at once)

Although the above provides a method to determine the end of the task, but not in the main thread all the time to determine, sometimes we are aware of a task is finished, go to get the results, rather than always determine the end of each task or not. This time you can use the as_completed method to retrieve the results of all tasks at once.

# -*-coding:utf-8 -*-
from  import ThreadPoolExecutor, as_completed
import time

# The parameter times is used to simulate network request times
def get_html(times):
    (times)
    print("get page {}s finished".format(times))
    return times

# Create thread pools
# Set up up to 2 threads to run and others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
# Put all tasks into the thread pool at once, get a handle, but can only execute up to 2 tasks at once
all_task = [ (get_html,(each_url)) for each_url in urls ] 

for future in as_completed( all_task ):
    data = ()
    print("in main:get page {}s success".format(data))

# Results
# get page 2s finished
# in main:get page 2s success
# get page 3s finished
# in main:get page 3s success
# get page 4s finished
# in main:get page 4s success
# As you can see from the results,It's not a matter of which one is passed in firsturl,is executed first.url,in no particular order

<5>map() method

In addition to the as_completed() method above, you can also use the execumap method. But there is one difference, using the map method, you don't need to use the submit method in advance, the
The map method has the same meaning as map in the python standard library, which is to execute the same function for each element in the sequence. The above code executes the get_html() function for each element in the urls list and allocates each thread pool. You can see that the result is different from the above as_completed method, the output order is the same as the order of the urls list, even if the task of 2s is completed first, the task of 3s is completed first, and then the task of 2s is completed.

# -*-coding:utf-8 -*-
from  import ThreadPoolExecutor,as_completed
import time
# The parameter times is used to simulate network request times
def get_html(times):
    (times)
    print("get page {}s finished".format(times))
    return times
# Create thread pools
# Set up up to 2 threads to run and others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
for result in (get_html, urls):
    print("in main:get page {}s success".format(result))

Results:

 get page 2s finished
 get page 3s finished
 in main:get page 3s success
 in main:get page 2s success
 get page 4s finished
 in main:get page 4s success

<6>wait() method

The wait method allows the main thread to block until the set requirements are met. the wait method takes three parameters, the sequence of tasks to wait for, a timeout, and a wait condition.
Waiting condition return_when defaults to ALL_COMPLETED, indicating that we have to wait for all the tasks to be lent. You can see the results of the run, it is true that all tasks are completed, the main thread to print out the main, waiting conditions can also be set to FIRST_COMPLETED, that the first task is completed to stop waiting.

The timeout time parameter can be left unset:

The wait() method has nothing to do with as_completed(), map(). Whether you use the as_completed(), or map() methods, you can use wait() before executing the main thread.
as_completed() and map() are either/or.

# -*-coding:utf-8 -*-
from  import ThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETED
import time
# The parameter times is used to simulate network request times
def get_html(times):
    (times)
    print("get page {}s finished".format(times))
    return times
   
# Create thread pools
# Set up up to 2 threads to run, others to wait
executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
all_task = [(get_html,(url)) for url in urls]
wait(all_task,return_when=ALL_COMPLETED)
print("main")
# Results
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

2. ProcessPoolExecutor multiprocessing

<1> Synchronized call approach: call, then wait for return value, decoupled, but slow

import datetime
from  import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import time, random, os
import requests
def task(name):
    print('%s %s is running'%(name,()))
    #print(().strftime("%Y-%m-%d %H:%M:%S"))
    
if __name__ == '__main__':
    p = ProcessPoolExecutor(4)  # Settings
    
    for i in range(10):
        # Synchronized way of calling, not only calling, but also waiting for the return value
        obj = (task, "Process pid:")  # Passing of parameters (task name, parameters), use of positional or keyword parameters for parameters
        res = ()
    (wait=True)  # Close the entry to the process pool and wait for the tasks in the pool to finish running
    print("Lord.")
################
################
# Another demo of a synchronized call
def get(url):
    print('%s GET %s' % ((),url))
    (3)
    response = (url)
    if response.status_code == 200:
        res = 
    else:
        res = "Download failed."
    return res  # With return value

def parse(res):
    (1)
    print("%s parsed as %s." %((),len(res)))

if __name__ == "__main__":
    urls = [
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
    ]
    p=ProcessPoolExecutor(9)
    l=[]
    start = ()
    for url in urls:
        future = (get,url)  # Need to wait for results, so it's a synchronized call #
        (future)
    
    # Close the process pool and wait for all processes to finish executing
    (wait=True)
    for future in l:
        parse(())
    print('Completion time:',()-start)
    #Completion time: 13.209137678146362

<2> asynchronous call mode: only call, do not wait for the return value, there may be coupling, but fast!

def task(name):
    print("%s %s is running" %(name,()))
    ((1,3))
if __name__ == '__main__':
    p = ProcessPoolExecutor(4) # :: Setting up processes in the process pool
    for i in range(10):
        # Asynchronous call method, only call, not wait for return value
        (task,'Process pid:') # Passing parameters (task name, parameters), using positional or keyword parameters for parameters
    (wait=True)  # Close the entry to the process pool and wait for the tasks in the pool to finish running
    print('Lord')
##################
##################
# Another demo of an asynchronous call
def get(url):
    print('%s GET %s' % ((),url))
    (3)
    reponse = (url)
    if reponse.status_code == 200:
        res = 
    else:
        res = "Download failed."
    parse(res)  # No return value
def parse(res):
    (1)
    print('%s parsed as %s' %((),len(res)))

if __name__ == '__main__':
    urls = [
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',

    ]
    p = ProcessPoolExecutor(9)
    start = ()
    for url in urls:
        future = (get,url)
    (wait=True)
    print("Completion time",()-start)#  Completion time 6.293345212936401

<3> How to use the asynchronous call method, but at the same time avoid the coupling problem?

(1) Process pools:Asynchronous + callback functions,, cpu-intensive, simultaneous execution, each process has a different interpreter and memory space, do not interfere with each other

def get(url):
    print('%s GET %s' % ((), url))
    (3)
    response = (url)
    if response.status_code == 200:
        res = 
    else:
        res = 'Download failed'
    return res
def parse(future):
    (1)
    # Passed in an object, get the return value, need to do result
    res = ()
    print("res",)
    print('%s parsed as %s' % ((), len(res)))
if __name__ == '__main__':
    urls = [
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
    ]
    p = ProcessPoolExecutor(9)
    start = ()
    for url in urls:
        future = (get,url)
        # Callback function methods within the module, parse will use the return value of the future object, the object return value is the return value of the execution task
        The #callback should be the equivalent of parse(future)
        future.add_done_callback(parse)
   (wait=True)
    print("Completion time",()-start)#Completion time 33.79998469352722

(2) Thread Pool:asynchronous + callback functions , IO-intensive main way to use , thread pooling: the execution of the operation for whoever is available to execute the

def get(url):
    print("%s GET %s" %(current_thread().name,url))
    (3)
    reponse = (url)
    if reponse.status_code == 200:
        res = 
    else:
        res = "Download failed."
    return res
def parse(future):
    (1)
    res = ()
    print("%s parsed as %s." %(current_thread().name,len(res)))
if __name__ == '__main__':
    urls = [
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
        '',
    ]
    p = ThreadPoolExecutor(4)
    start = ()
    for url in urls:
        future = (get,url)
        future.add_done_callback(parse)
    (wait=True)
    print("Lord.",current_thread().name)
    print("Completion time",()-start)#Completion time 32.52604126930237

3. Summary

  • 1, more threads are not better, it will involve cpu context switching (which will save the last record).
  • 2, the process than the thread consumption of resources, the process is equivalent to a factory, the factory has a lot of people, the people inside the common enjoyment of the welfare resources,, a process by default there is only one main thread, for example: open the program is the process, the inside of the implementation of the thread, the thread is just a process to create more than one person at the same time to go to work.
  • 3、There are GIL global unlockers in the thread:Disallow cpu scheduling
  • 4. Calculation density type is applicable to multiple processes
  • 5、Thread:A thread is the smallest unit of work in a computer
  • 6、Process:By default, there is a main thread (helper) that can coexist with multiple threads.
  • 7、Concordance:A thread, a process to do multiple tasks, using a thread in a process to do multiple tasks, micro-threading
  • 8、GIL Global Interpreter Lock: Ensure that only one thread is scheduled by the cpu at a time.

to this article on the python ThreadPoolExecutor thread pool and ProcessPoolExecutor process pool is introduced to this article, more related python ThreadPoolExecutor content, please search for my previous articles or continue to browse the following related articles I hope that you will support me in the future!