SoFunction
Updated on 2024-11-18

The relationship between join and task_done in Queue queue and its description

join() with task_done()

Mostly on the web the original statement about the end of join() vs task_done() is this:

  • Queue.task_done() After completing a job, the Queue.task_done() function sends a signal to the queue where the task has been completed
  • () In practice this means waiting until the queue is empty before performing another operation

But probably many people still don't quite understand it, so here is my own understanding of how the two are related.

understandings

If the thread every take from the queue, but did not execute task_done (), then join can not determine whether the queue is finished or not, in the last execution of a join () can not wait for the result, will always hang.

It can be understood that every time task_done delete an element from the queue, so that in the final join according to the length of the queue is zero to determine whether the queue is over, so as to execute the main thread.

See an example of your own writing below:

The following example will hang indefinitely where join() is because join is waiting for the queue to empty, but since there is no task_done, it thinks the queue is not empty yet and keeps waiting.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
# The reason why threads are used is because threads can START and then continue to execute the main thread behind them, can PUT data, if not threads directly in the GET blocking.
class Mythread():
 def __init__(self,que):
 .__init__(self)
  = que
 def run(self):
 while True:
 sleep(1)
 if (): # judgment to get in front, this can be, otherwise the queue will be empty after the last fetch, direct break, go not print
 break
 item = ()
 print(item,'!')
 #.task_done()
 return
que = ()
tasks = [Mythread(que) for x in range(1)]
for x in range(10):
 
 (x) # Rapid production
for x in tasks:
 t = Mythread(que) #Passing the same queue into 2 threads
 ()
 
()
 
print('---success---')

If you remove the .task_done() comment, you will finish executing the main program without any problems.

This is the meaning of the phrase "the Queue.task_done() function sends a signal to the queue where the task has been completed", which allows the join() function to determine how much of the queue is left and whether it has been emptied.

And in fact, if we look at the queue's source code, we can see that it does indeed execute an unfinished queue minus one:

 def task_done(self):
 '''Indicate that a formerly enqueued task is complete.
 Used by Queue consumer threads. For each get() used to fetch a task,
 a subsequent call to task_done() tells the queue that the processing
 on the task is complete.
 If a join() is currently blocking, it will resume when all items
 have been processed (meaning that a task_done() call was received
 for every item that had been put() into the queue).
 Raises a ValueError if called more times than there were items
 placed in the queue.
 '''
 with self.all_tasks_done:
 unfinished = self.unfinished_tasks - 1
 if unfinished <= 0:
 if unfinished < 0:
 raise ValueError('task_done() called too many times')
 self.all_tasks_done.notify_all()
 self.unfinished_tasks = unfinished
 

Rapid production - rapid consumption

The demo code above is a fast production - slow consumption scenario, we can directly use task_done() in conjunction with join() to allow empty() to determine whether the queue has ended.

Of course, queue we can correctly determine whether it has been emptied, but the get queue in the thread is not known, if there is nothing to tell it that the queue is empty, so get will continue to block, then we need to add a judgment in the get program, if empty() holds, break to exit the loop, otherwise get() will still keep blocking.

Slow production - fast consumption

However, if the producer speed and consumer speed is comparable, or the production speed is less than the consumption speed, then rely on task_done () to achieve the queue minus one is not reliable, the queue will be in demand from time to time in the state of supply is often empty, so the use of empty to judge is not reliable.

This situation then leads to a situation where join can tell that the queue is over, but you can't rely on empty() in the thread to tell if the thread is ready to end.

We can stuff a specific "marker" at the end of each thread in the consumption queue, and judge it at the time of consumption, if we get such a "marker", we can determine the end of the queue, because the production queue is over, and there will be no new additions. The queue is over because the production queue is over and no more will be added to it.

The code is as follows:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
# The reason why threads are used is because threads can START and then continue to execute the main thread behind them, can PUT data, if not threads directly in the GET blocking.
class Mythread():
 def __init__(self,que):
 .__init__(self)
  = que
 def run(self):
 while True:
 item = ()
 .task_done() # here to be put before the judgment, otherwise take the last last time has been empty, directly break, task_done can not be executed, join () judgment queue has not been finished
 if item == None:
 break
 print(item,'!')
return
que = ()
tasks = [Mythread(que) for x in range(1)]
 # Rapid production
for x in tasks:
 t = Mythread(que) #Passing the same queue into 2 threads
 ()
for x in range(10):
 sleep(1)
 (x)
for x in tasks:
 (None)
()
print('---success---')

point of attention

Never use task_done() when the put queue finishes, or you will get an error:

task_done() called too many times

This is because the method simply represents a token that is executed after a successful get.

summarize

The above is a personal experience, I hope it can give you a reference, and I hope you can support me more.