SoFunction
Updated on 2024-11-18

IO Model for Python Concurrent Programming

The five IO models

To better understand the IO model, we need to review beforehand: synchronous, asynchronous, blocking, non-blocking

  • Synchronous IO
  • Asynchronous IO
  • Blocking IO
  • Non-blocking IO

The five I/O models include: blocking I/O, non-blocking I/O, signal-driven I/O (not commonly used), I/O multiplexing, and asynchronous I/O. Of these, the first four are known as synchronous I/O.

The degree of blocking for the last five models is from low to high: blocking I/O > non-blocking I/O > multiplexing I/O > signal-driven I/O > and asynchronous I/O, so their efficiency is from low to high.

1. Blocking I/O model

In linux, all sockets are blocking by default, and almost all I/O interfaces (including socket interfaces) are blocking unless otherwise specified.

If you are faced with thousands or even tens of thousands of client requests at the same time, "thread pooling" or "connection pooling" may be able to alleviate some of the pressure, but it cannot solve all the problems. In short, the multi-threaded model can easily and efficiently solve small-scale service requests, but in the face of large-scale service requests, the multi-threaded model will also encounter bottlenecks, you can use non-blocking interfaces to try to solve this problem.

2, non-blocking I / O model

In non-blocking I/O, the user process actually needs to keep actively asking the kernel if the data is ready.But non-blocking I/O models are never recommended.

Non-blocking, do not wait. For example, when creating a socket to connect to a certain address, get the received data recv will wait by default (successful connection or received data), before performing subsequent operations.
If setblocking(False) is set, the above two processes will no longer wait, but will report a BlockingIOError error, just catch it.

Asynchronous, notification, the execution of the completion of the automatic execution of the callback function or automatic execution of certain operations (notification). For example, if a crawler sends a request to a certain address baidu.com, it will execute the callback function after the request is executed.

3. Multiplexed I/O model (event-driven)

Asynchronous non-blocking frameworks based on event loops: e.g., Twisted frameworks, scrapy frameworks (single thread to accomplish concurrency).

Detect if multiple sockets have changed (have connected successfully/have acquired data) (readable/writable) IO multiplexing role?

There are three modes in which the operating system detects whether a socket has changed:

  • select: up to 1024 sockets; loop to detect.
  • poll: no limit on the number of sockets to listen to; loop to detect (horizontal trigger).
  • epoll: no limit to the number of listening sockets; callback method (edge-triggered).

Python module:

Based on IO multiplexing + socket non-blocking , to achieve concurrent requests ( a thread 100 requests )

import socket
# Create sockets
client = ()
# Make the previously blocked position non-blocking (report error)
(False)

# Baidu create connection: blocking
try:
    # Executed but reported an error
    (('',80))
except BlockingIOError as e:
    pass

# Successful connection detected

# Ask Baidu what I want?
(b'GET /s?wd=alex HTTP/1.0\r\nhost:\r\n\r\n')

# I'm waiting to receive a reply from Baidu #
chunk_list = []
while True:
    # Make the previously blocked position non-blocking (report error)
    chunk = (8096) 
    if not chunk:
        break
    chunk_list.append(chunk)

body = b''.join(chunk_list)
print(('utf-8'))

selectors module

#server
from socket import *
import selectors

sel=()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    (conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=(1024)
        if not data:
            print('closing',conn)
            (conn)
            ()
            return
        (()+b'_SB')
    except Exception:
        print('closing', conn)
        (conn)
        ()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) # Set the socket's interface to non-blocking
(server_fileobj,selectors.EVENT_READ,accept) # is equivalent to appending a file handle to the read list of the netselect.
                                                         #server_fileobj, and binds a callback function, accept.

while True:
    events=() # Detect all fileobj's that have completed wait data for the
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

# Client
from socket import *
c=socket(AF_INET,SOCK_STREAM)
(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    (('utf-8'))
    data=(1024)
    print(('utf-8'))

4、Asynchronous I/O

asynciois a standard library introduced in Python version 3.4 with direct built-in support for asynchronous IO.

asyncioof the programming model is a message loop. We start with theasynciomodule to get a directEventLoopreference, and then throws the co-programs that need to be executed into theEventLoopexecution, asynchronous IO is realized.

expense or outlayasynciorealizationHello worldThe code is as follows:

import asyncio

@
def hello():
    print("Hello world!")
    # Asynchronous calls (1).
    r = yield from (1)
    print("Hello again!")

# GetEventLoop.
loop = asyncio.get_event_loop()
# Execute coroutine
loop.run_until_complete(hello())
()

@Marking a generator as a coroutine type, we then take thiscoroutinethrow atEventLoopExecuted in.

hello()will first print out theHello world!And then.yield fromsyntax allows us to conveniently call anothergenerator. As a result of()It's also acoroutineSo the thread won't wait(), but instead just interrupts and executes the next message loop. When the()Upon return, the thread can then be retrieved from theyield fromGet the return value (in this caseNone), and then proceeds to execute the next line of the statement.

particle marking the following noun as a direct object(1)Think of it as an IO operation that takes 1 second, during which the main thread doesn't wait, but instead goes to execute theEventLoopOther executablecoroutineup, so concurrent execution is possible.

We use Task to encapsulate twocoroutineTry:

import threading
import asyncio

@
def hello():
    print('Hello world! (%s)' % ())
    yield from (1)
    print('Hello again! (%s)' % ())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete((tasks))
()

Observe the execution process:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(suspend an agreement1unit of angle or arc equivalent one sixtieth of a degree)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

As you can see by the printed current thread name, the twocoroutineis executed concurrently by the same thread.

If the()Switching to real IO operations, multiplecoroutinecan then be executed concurrently by a single thread.

We useasyncioasynchronous network connection to get the home page of sina, sohu and 163:

import asyncio

@
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    (('utf-8'))
    yield from ()
    while True:
        line = yield from ()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, ('utf-8').rstrip()))
    # Ignore the body, close the socket
    ()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['', '', '']]
loop.run_until_complete((tasks))
()

The results of the implementation are as follows:

wget ...
wget ...
wget ...
(Wait a while.)
(print outsohu(used form a nominal expression)header)
 header > HTTP/1.1 200 OK
 header > Content-Type: text/html
...
(print outsina(used form a nominal expression)header)
 header > HTTP/1.1 200 OK
 header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(print out163(used form a nominal expression)header)
 header > HTTP/1.0 302 Moved Temporarily
 header > Server: Cdn Cache Server V2.0
...

It can be seen that 3 connections are made by a single thread through thecoroutineConcurrent completion.

async/await

expense or outlayasynciooffered@It is possible to mark a generator as a coroutine type, and then use theyield fromCall another coroutine to implement an asynchronous operation.

To simplify and better label asynchronous IO, a new syntax has been introduced starting with Python 3.5asynccap (a poem)awaitThis will make the coroutine's code more concise and readable.

Please note.asynccap (a poem)awaitis the new syntax for coroutines, and to use the new syntax, you only need to do two simple substitutions:

  • particle marking the following noun as a direct object@Replace withasync
  • particle marking the following noun as a direct objectyield fromReplace withawait

Let's compare the code from the previous section:

@
def hello():
    print("Hello world!")
    r = yield from (1)
    print("Hello again!")

Rewrite it with the new syntax as follows:

async def hello():
    print("Hello world!")
    r = await (1)
    print("Hello again!")

The rest of the code stays the same.

wrap-up

asyncioProvides complete asynchronous IO support;

Asynchronous operations require thecoroutinehit the nail on the headyield fromDone;

multi- (faceted, ethnic etc)coroutineIt can be wrapped into a set of Tasks and then executed concurrently.

This is the end of this article about Python Concurrent Programming of IO model. I hope it will be helpful for your learning and I hope you will support me more.