SoFunction
Updated on 2024-11-13

python coprocessing with asyncio library details

Preface:

The concept of concurrency in python was added in version 3.4, but version 3.4 used a generator implementation. In order to differentiate between concurrency and generator usage scenarios and to make the semantics clearer, python 3.5 added theasync cap (a poem)await keyword that defines the native concatenation.

Asynchronous I/O Library

The asyncio library in python provides methods for managing events, co-programs, tasks, and threads, as well as a primitive language for writing concurrent code, theasync cap (a poem)await

Key elements of the module:

  • Event Loop:event_loop, which manages all events, is an infinite loop method that keeps track of the order in which events occur during the loop placing them in a queue and calling the appropriate event handler to handle them when idle;
  • Concordance:coroutineThe concept of subroutine generalization allows a concatenation to pause during execution, wait for external processing (I/O operations) to complete, and then resume running from the point where it paused, function-definition style using theasynckeyword so that the function is not executed immediately, but returns a concatenated object;
  • Futurecap (a poem)TaskFutureobject represents a calculation that has not yet been completed.TaskbeFuturesubclass that contains the states of a task and serves the purpose of running multiple tasks concurrently while running a particular task.

Definition of an asynchronous function

An asynchronous function is still essentially a function, except that it hands over execution rights to other co-programs during execution, and differs from a normal function definition in that thedefAdd before keywordsasync

# Asynchronous functions
import asyncio
# Asynchronous functions
async def func(x):
    print("Asynchronous functions")
    return x ** 2
ret = func(2)
print(ret)

Run the code and enter the following:

sys:1: RuntimeWarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002C8C248>

The function returns a concatenation object, which needs to be placed in the event loop if you want the function to be executed.event_loopCenter.

Event Loop event_loop

event_loopbeasyncioThe core of the module, which registers asynchronous functions to the event loop. The procedure is implemented as follows: a function called by theloopCalling a co-program at the right time, the method used here is calledasyncio.get_event_loop()and then byrun_until_complete(concurrent object) Registers the coprogram into the event loop and starts the event loop.

import asyncio
# Asynchronous functions
async def func(x):
    print("Asynchronous functions")
    return x ** 2
# Concurrent objects, which can't be run directly
coroutine1 = func(2)
# Event loop objects
loop = asyncio.get_event_loop()
# Add the concurrent object to the event loop and execute the
ret = loop.run_until_complete(coroutine1)
print(ret)

The first use of asynchronous functions in python prior to 3.7 is to install the above process:

  • first throughasyncio.get_event_loop()Get event looploopObject;
  • Then with a different strategy callloop.run_until_complete()orloop.run_forever()Execute asynchronous functions.

In python 3.7 and later, it is straightforward to use the() That's it, the function will always create a new event loop and close it at the end.

updatedofficial documentBoth use therunMethods. Official Case

import asyncio
async def main():
    print('hello')
    await (1)
    print('world')
(main())

Next in viewing a complete case and combining theawaitKeywords.

import asyncio
import time
# Asynchronous function 1
async def task1(x):
    print("Task 1")
    await (2)
    print("Recovery mission 1")
    return x
# Asynchronous functions 2
async def task2(x):
    print("Task 2")
    await (1)
    print("Recovery mission 2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1 = await task1(1)
    ret_2 = await task2(2)
    print("Task 1 returns a value of", ret_1)
    print("Task 2 returns the value of", ret_2)
    print("Running time", time.perf_counter() - start_time)
if __name__ == '__main__':
	# Create an event loop
    loop = asyncio.get_event_loop()
    # Add the concurrent object to the event loop and execute the
    loop.run_until_complete(main())

The code output is shown below:

Task 1
Recovery mission 1
Task 2
Recovery mission 2
Task 1 returns a value of 1
Task 2 returns a value of 2
Running time 2.99929154

The above code creates 3 concurrent programs, of whichtask1cap (a poem)task2are placed in the concatenated functionmainIn this case, I/O operations are performed through the(1)The simulation shows that the entire function runs in 2.9999 seconds, which is close to 3 seconds, and is still serialized. If you want to change it to concurrent execution, change the code as follows.

import asyncio
import time
# Asynchronous function 1
async def task1(x):
    print("Task 1")
    await (2)
    print("Recovery mission 1")
    return x
# Asynchronous functions 2
async def task2(x):
    print("Task 2")
    await (1)
    print("Recovery mission 2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1,ret_2 = await (task1(1),task2(2))
    print("Task 1 returns a value of", ret_1)
    print("Task 2 returns the value of", ret_2)
    print("Running time", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The biggest change in the above code is that thetask1cap (a poem)task2Put it in.()run in it, at which point the code output time is significantly shorter.

Task 1
Task 2
Resume Task 2 # Task 2 returns first due to the short wait time.
Recovery mission 1
Task 1 returns a value of 1
Task 2 returns a value of 2
Running time 2.0005669480000003

()Can be replaced with()The modified code is shown below:

import asyncio
import time
# Asynchronous function 1
async def task1(x):
    print("Task 1")
    await (2)
    print("Recovery mission 1")
    return x
# Asynchronous functions 2
async def task2(x):
    print("Task 2")
    await (1)
    print("Recovery mission 2")
    return x
async def main():
    start_time = time.perf_counter()
    done, pending = await ([task1(1), task2(2)])
    print(done)
    print(pending)
    print("Running time", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

()Returns a tuple containing a collection of completed tasks and a collection of uncompleted tasks.

Difference between gather and wait:

  • gather: It is required that all tasks are finished, and if any of the concurrent functions crashes, an exception is thrown and no result is returned;
  • wait: You can define the timing of the function's return, which can be set toFIRST_COMPLETED(first one to end).FIRST_EXCEPTION(the first exception), theALL_COMPLETED(all executed, by default).
done,pending = await ([task1(1),task2(2)],return_when=.FIRST_EXCEPTION)

Creating a task

Since the concurrent object cannot be run directly, when registered to the event loop, it is therun_until_completemethod wraps it into ataskObject. This object is a response to thecoroutineobject, which is a further encapsulation of thecoroutineObjects with more run states, such aspendingrunningfinished, you can use these states to get the execution of the concurrent object.

The following display willcoroutineobject encapsulated intaskobject, modified from the above code.

import asyncio
import time
# Asynchronous function 1
async def task1(x):
    print("Task 1")
    await (2)
    print("Recovery mission 1")
    return x
# Asynchronous functions 2
async def task2(x):
    print("Task 2")
    await (1)
    print("Recovery mission 2")
    return x
async def main():
    start_time = time.perf_counter()
    # Encapsulate task objects
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    task_2 = loop.create_task(coroutine2)
    ret_1, ret_2 = await (task_1, task_2)
    print("Task 1 returns a value of", ret_1)
    print("Task 2 returns the value of", ret_2)
    print("Running time", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

due totaskThe object isfutureobject's subclass object, so the above code can also be modified as follows:

# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)

following sectiontaskThe individual states of the object are printed out.

import asyncio
import time
# Asynchronous function 1
async def task1(x):
    print("Task 1")
    await (2)
    print("Recovery mission 1")
    return x
# Asynchronous functions 2
async def task2(x):
    print("Task 2")
    await (1)
    print("Recovery mission 2")
    return x
async def main():
    start_time = time.perf_counter()
    # Encapsulate task objects
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    # task_2 = loop.create_task(coroutine2)
    task_2 = asyncio.ensure_future(coroutine2)
    # Entering pending status
    print(task_1)
    print(task_2)
    # Get the completion status of the task
    print(task_1.done(), task_2.done())
    # Implementing the mandate
    await task_1
    await task_2
    # Get the completion status again
    print(task_1.done(), task_2.done())
    # Getting results back
    print(task_1.result())
    print(task_2.result())
    print("Running time", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

await task_1indicates the execution of the concatenation, and at the end of the execution, the()come (or go) backTrue()Get the return value.

Callback Return Value

When a thread has finished executing, you need to get its return value, and one way to do this has just been demonstrated using the()method, but that method only gets the result when the concatenation has finished running; if the concatenation hasn't finished running, theresult()method returns the(Invalid status error).

The second option is used for general coding, byadd_done_callback()method binding callback.

import asyncio
import requests
async def request_html():
    url = ''
    res = (url)
    return res.status_code
def callback(task):
    print('Callbacks:', ())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# Bind callbacks
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)

The above code is used whencoroutineWhen execution is complete, thecallbackfunction.

If the callback function takes more than one argument, use thefunctoolsThe bias function in the module (partial) Methodology

Cyclic event closure

It is recommended to call the loop event object after each encoding is finishedclose()Methods for a thorough cleanuploopObject.

2. This section of the crawler project

The sites to be captured in this lesson are all coser images, so the addresses are just viewable in the code.

The complete code is shown below:

import threading
import asyncio
import time
import requests
import lxml
from bs4 import BeautifulSoup
async def get(url):
    return (url)
async def get_html(url):
    print("Prepare for capture:", url)
    res = await get(url)
    return 
async def save_img(img_url):
    # thumbMid_5ae3e05fd3945 Replace small image with large image
    img_url = img_url.replace('thumb','thumbMid')
    img_url = "/" + img_url
    print("Picture download in progress:", img_url)
    res = await get(img_url)
    if res is not None:
        with open(f'./imgs/{()}.jpg', 'wb') as f:
            ()
            return img_url,"ok"
async def main(url_list):
    # 5 tasks created
    tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
    dones, pending = await (tasks)
    for task in dones:
        html = ()
        soup = BeautifulSoup(html, 'lxml')
        divimg_tags = soup.find_all(attrs={'class': 'workimage'})
        for div in divimg_tags:
            ret = await save_img(["data-original"])
            print(ret)
if __name__ == '__main__':
    urls = [f"/picture/lists/p/{page}" for page in range(1, 17)]
    totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1
    # Slicing the list of urls to facilitate collection
    for page in range(0, totle_page):
        start_page = 0 if page == 0 else page * 5
        end_page = (page + 1) * 5
        # Cyclic event objects
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(urls[start_page:end_page]))

Code Description:The first thing to note in the above code isawaitThe keyword can only be followed by the following:

  • Native concatenation object;
  • A file that containsawaitmethod of the object returned by an iterator.

So the above codeget_htmlA concatenation is nested in the functiongetThe Main FunctionsmainInside, the urls are sliced directly for ease of computation, and then run through a loop.

The last two lines of the above code, of course, can be modified directly:

 # Cyclic event objects
 # loop = asyncio.get_event_loop()
 #
 # loop.run_until_complete(main(urls[start_page:end_page]))
 (main(urls[start_page:end_page]))

Easy access to a bunch of high-resolution images:

to this article on python concurrency and asyncio library details of the article is introduced to this, more related python concurrency content please search for my previous articles or continue to browse the following related articles I hope you will support me in the future more!