Syntactically, a concatenation is similar to a generator in that it is a function with a yield keyword in the definition body. However, in a concatenation, the yield usually appears to the right of the expression (e.g., datum = yield), and may or may not yield a value -- if there is no expression after the yield keyword, then the generator yields None.
A concatenation may receive data from the caller, but the caller provides the data to the concatenation using the .send(datum) method, not the next(...) function.
The ==yield keyword can even be used without receiving or passing data. Regardless of how the data flows, yield is a process control tool that can be used to enable collaborative multitasking: a concurrent program can yield the controller to a central scheduler, which activates other concurrent programs ==.
Basic Behavior of Generators for Concurrent Programs
Here's the simplest code for a concurrent program:
def simple_coroutine(): print('-> start') x = yield print('-> recived', x) sc = simple_coroutine() next(sc) ('zhexiao')
Explanation:
1. A collaborative program uses a generator function definition: the definition body has the yield keyword.
2. yield is used in an expression; if the concatenation simply receives data from the client, then the value of the output is None -- this value is implicitly specified because there is no expression to the right of the yield keyword.
3. The next(...) function has to be called first, because the generator hasn't started yet, and it didn't pause at the yield statement, so it can't send the data at first.
4. The send method is called, the value is passed to the yielded variable, and the concatenation resumes, continuing the execution of the following code until the next yield expression is run, or until it terminates.
==Note: The send method only works when the concatenation is in the GEN_SUSPENDED state, so we use the next() method to activate the concatenation and stop it at the yield expression, or we can use (None), which has the same effect as next(sc) ==.
The four states of the concurrent program:
A concurrent program can be in one of four states. The current state can be determined using the (...) function, which returns one of the following strings:
1. GEN_CREATED: Waiting for the start of execution
2. GEN_RUNNING: Interpreter is executing
3. GEN_SUSPENED: pause at field expression
4. GEN_CLOSED: End of implementation
==The first call to the next(sc) function is often referred to as "priming" the concatenation== (i.e., letting the concatenation go forward to the first yield expression, ready to be used as an active concatenation).
import inspect def simple_coroutine(a): print('-> start') b = yield a print('-> recived', a, b) c = yield a + b print('-> recived', a, b, c) # run sc = simple_coroutine(5) next(sc) (6) # 5, 6 (7) # 5, 6, 7
Example: Calculating a Moving Average Using Concurrent Programming
def averager(): total = 0.0 count = 0 avg = None while True: num = yield avg total += num count += 1 avg = total/count # run ag = averager() # Pre-excited co-programs print(next(ag)) # None print((10)) # 10 print((20)) # 15
Explanation:
1. When the next(ag) function is called, the concatenation moves forward to the yield expression, yielding the initial value of the average variable, None.
2. At this point, the concatenation pauses at the yield expression.
3. Use send() to activate the coprocessor, assign the sent value to num, and compute the avg value.
4. Use print to print out the data returned by yield.
Terminating Concurrent Programs and Exception Handling
Unhandled exceptions in the concatenation bubble up to the caller of the next function or send method (i.e., the object that triggered the concatenation).
One way to == terminate a concurrent program is to send some sort of sentinel value to make the concurrent program exit. Built-in constants such as None and Ellipsis are often used as sentinel values==.
Explicitly send the exception to the coprocessor
Starting with Python 2.5, client code can explicitly send exceptions to the coprocessor by calling two methods on the generator object.
(exc_type[, exc_value[, traceback]])
causes the generator to throw the specified exception at the paused yield expression. If the generator handles the thrown exception, the code executes forward to the next yield expression, and the output becomes the return value of the called method. If the generator does not handle the thrown exception, the exception bubbles up to the caller's context.
()
Causes the generator to throw a GeneratorExit exception at a paused yield expression. If the generator does not handle this exception, or if a StopIteration exception is thrown (which is usually the end of the run), the caller will not report an error. If a GeneratorExit exception is received, the generator must not produce a value or the interpreter will throw a RuntimeError exception. Other exceptions thrown by the generator are bubbled up and passed to the caller.
Example of exception handling:
class DemoException(Exception): """ custom exception """ def handle_exception(): print('-> start') while True: try: x = yield except DemoException: print('-> run demo exception') else: print('-> recived x:', x) raise RuntimeError('this line should never run') he = handle_exception() next(he) (10) # recived x: 10 (20) # recived x: 20 (DemoException) # run demo exception (40) # recived x: 40 ()
If an unhandled exception is passed, the concatenation terminates:
(Exception) # run demo exception
yield from gets the return value of the concatenation
In order to get the return value, the concatenation must terminate normally; the generator object then throws a StopIteration exception, and the value property of the exception object holds the returned value.
The ==yield from structure automatically catches the StopIteration exception internally==. In the case of a yield from structure, the interpreter not only catches the StopIteration exception, but also turns the value attribute into the value of the yield from expression.
Basic Usage of yield from
== When yield from subgen() is used in the generator gen, subgen gains control and passes the output to the caller of gen, i.e., the caller has direct control over subgen. meanwhile, gen blocks, waiting for subgen to terminate ==.
The following two functions do the same thing, except that they use yield from in a more concise way:
def gen(): for c in 'AB': yield c print(list(gen())) def gen_new(): yield from 'AB' print(list(gen_new()))
The first thing a ==yield from x expression does with an x object is to call iter(x) and get the iterator from it, so x can be any iterable object, which is just the most basic use of yield from==.
Advanced Usage of yield from
The main function of ==yield from is to open up a two-way channel that connects the outermost caller to the innermost subgenerator, so that the two can send and output values directly, and also pass in exceptions directly, without having to add a lot of sample code for handling exceptions in the co-processes that sit in the middle ==.
yield from Specialized terms
Delegate generator: a generator function containing a yield from expression.
Subgenerator: the generator partially obtained from yield from.
icon
Explanation:
1. When the delegate generator pauses at a yield from expression, the caller can send the data directly to the child generator.
2. The sub-generator in turn sends the value of the output to the caller.
3. After the child generator returns, the interpreter throws a StopIteration exception and appends the return value to the exception object, at which point the delegate generator resumes.
Advanced Examples
from collections import namedtuple ResClass = namedtuple('Res', 'count average') # Subgenerator def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return ResClass(count, average) # Delegation Generator def grouper(storages, key): while True: # Get the value returned by averager() storages[key] = yield from averager() # Client code def client(): process_data = { 'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46] } storages = {} for k, v in process_data.items(): # Get the concordance coroutine = grouper(storages, k) # Pre-excited co-programs next(coroutine) # Send data to the co-program for dt in v: (dt) # Terminate the concurrent program (None) print(storages) # run client()
Explanation:
1. Each iteration of the outer for loop creates a new instance of grouper, which is assigned to the coroutine variable; grouper is the delegate generator.
2. Call next(coroutine), which pre-excites the delegate generator grouper, and then enters a while True loop, which pauses at the yield from expression after calling the child generator averager.
3. The inner for loop is called (value), passing the value directly to the child generator averager. meanwhile, the current grouper instance (coroutine) is suspended at the yield from expression.
4. At the end of the inner loop, the grouper instance is still suspended at the yield from expression, so the statement in the body of the grouper function definition that assigns a value to results[key] has not been executed.
5. (None) Terminate the averager sub-generator, which throws the StopIteration exception and includes the returned data in the value of the exception object. yield from can catch the StopItration exception directly and assign the value of the exception object to results[key].
The meaning of yield from
- The values output by the sub-generators are all passed directly to the caller of the delegate generator (i.e., the client code).
- Any values sent to the delegate generator using the send() method are passed directly to the child generator. If the value sent is None, then the next() method of the child generator is called. If the value sent is not None, then the child generator's send() method is called. If the called method throws a StopIteration exception, then the delegate generator resumes running. Any other exceptions are bubbled up and passed to the delegate generator.
- A return expr expression in a generator (or subgenerator) triggers a StopIteration(expr) exception to be thrown when the generator exits.
- The value of the yield from expression is the first argument passed to the StopIteration exception when the subgenerator terminates.
- Exceptions passed to the delegate generator, except for GeneratorExit, are passed to the child generator's throw() method. If a StopIteration exception is thrown when the throw() method is called, the delegate generator resumes operation. Exceptions other than StopIteration are bubbled up to the delegate generator.
- If the GeneratorExit exception is passed into the delegate generator, or if the close() method is called on the delegate generator, then the close() method is called on the child generator, if it has one. If the call to the close() method causes an exception to be thrown, the exception bubbles up and is passed to the delegate generator; otherwise, the delegate generator throws the GeneratorExit exception.
Use Cases
Concurrency naturally expresses many algorithms, such as simulations, games, asynchronous I/O, and other forms of event-driven programming or collaborative multitasking. Concurrency is a fundamental building block of the asyncio package. The simulation system illustrates how concurrent activities can be realized using concurrency instead of threads.
In the field of simulation, the term process refers to the activity of an entity in the model, independent of the processes in the operating system. A process in a simulation system can be implemented using a process in the operating system, but it is usually implemented using a thread or a co-process.
Cab Example
import collections The # time field is the simulation time at which the event occurred. The # proc field is the number of the cab process instance. # action field is a string describing the activity. Event = ('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ Create an event each time the state changes, ceding control to the emulator :param proc_num. :param trips_num. :param start_time. :return. """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') # run t1 = taxi_process(1, 1) a = next(t1) print(a) # Event(time=0, proc=1, action='leave garage') b = ( + 6) print(b) # Event(time=6, proc=1, action='pick up people') c = ( + 12) print(c) # Event(time=18, proc=1, action='drop off people') d = ( + 1) print(d) # Event(time=19, proc=1, action='go home')
Analog console control of 3 cabs asynchronously
import collections import queue import random The # time field is the simulation time at which the event occurred. The # proc field is the number of the cab process instance. # action field is a string describing the activity. Event = ('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ Creates an event each time the state is changed, ceding control to the emulator :param proc_num. :param trips_num. :param start_time. :return. """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') class SimulateTaxi(object): """ Simulated Taxi Console """ def __init__(self, proc_map): # PriorityQueue objects that hold scheduled events. # If it's coming in as a tuple type, it defaults to using tuple[0] to do the sorting = () # procs_map argument is a dictionary, use dict to build a local copy. = dict(proc_map) def run(self, end_time): """ Schedule and display events until the end of time :param end_time. :return. """ for _, taxi_gen in (): leave_evt = next(taxi_gen) (leave_evt) # Main loop of the simulation system simulate_time = 0 while simulate_time < end_time: if (): print('*** end of events ***') break # The occurrence of the first event current_evt = () simulate_time, proc_num, action = current_evt print('taxi:', proc_num, ', at time:', simulate_time, ', ', action) # Prepare for the next event proc_gen = [proc_num] next_simulate_time = simulate_time + self.compute_duration() try: next_evt = proc_gen.send(next_simulate_time) except StopIteration: del [proc_num] else: (next_evt) else: msg = '*** end of simulation time: {} events pending ***' print((())) @staticmethod def compute_duration(): """ Randomly generates the time of the next event :return. """ duration_time = (1, 20) return duration_time # Spawned three cabs, all of which have now failed to leave the garage # taxis = {i: taxi_process(i, (i + 1) * 2, i * 5) for i in range(3)} # Simulation runs st = SimulateTaxi(taxis) (100)
This is the whole content of this article.