Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / algorithm

Discovering Python Step by Step - Using Python Queues - 02 - Extracting Elements from a Queue. Also: Tasks

3.29/5 (5 votes)
21 Aug 2020CPOL18 min read 11.2K  
How to extract elements from a queue in Python
In this article, I would like to continue using Python queues in a concurrent execution setting in a discovery fashion, starting with basic Python Queues, and keep adding complexity to solve specific requirements. We will find small issues with threading and we will try to circumvent the problems to have a properly functioning code.

Background

In my previous article about queues, I described the process to add elements to a queue. We created an insertion thread and we checked what happened when we filled the queue with elements with and without blocking from a separate thread. The thread adding elements stalled when we added enough elements to fill the queue. In this article, we will continue modifying the code to add the functionality to extract elements out of the queue for processing. I will also introduce the concept of queue task.

Extracting Elements From a Queue

As for the item insert, there is a Python method to extract items from a queue (pop elements),called get.

From the Python documentation:

Quote:
Queue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Prior to 3.0 on POSIX systems, and for all versions on Windows, if block is true and timeout is None, this operation goes into an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a KeyboardInterrupt.

As with the insertion, there is a non-blocking method for element extraction:

Python
Queue.get_nowait()

This call is equivalent to...

Python
Queue.get(block=False)

In this case, the exception raised is Queue.Empty, when no element can be extracted from the queue.

My recommendation is to use blocking for the get, and provide the call with an appropriate timeout (in seconds), to avoid issues with the uninterruptible wait. I will follow my own recommendations on this in a later article, for the sake of simplicity.

Adding Queue Element Processing

I made some changes to the code from my previous article on queues:

  • I added a new named thread for element extraction and processing to the code (a consumer thread). The associated method for the thread is process_items, and it accepts the queue from which it will extract items for processing as parameter.
  • I modified the logs so we can identify the thread from which we log.
  • I changed the number of inserted items to 10, instead of 100, to reduce the time to see the program end.
  • I made the main thread join to the consumer thread, to wait for its completion.

For the moment, the actual processing code we will use in our program is not really relevant. We will simulate the time consumed processing the elements executing performing a sleep on the processing thread. The processing method includes an infinite loop (while True), to make sure that the thread continues extracting elements when possible. I will improve this point in a later step, to add some way to stop this thread in a controlled manner.

The new code is as follows:

Python
import queue
import threading
import time

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        time.sleep(.1)    
        print("add_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def process_items(processing_queue):
    while True:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), 
                name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    
if __name__ == "__main__":
    main()

In my computer, the execution results in this sequence of logs:

Quote:

Adding item: 0
Processing item: 0
add_items - Processing queue size: 0. Remaining tasks: 1
Adding item: 1
add_items - Processing queue size: 1. Remaining tasks: 2
Adding item: 2
add_items - Processing queue size: 2. Remaining tasks: 3
Adding item: 3
add_items - Processing queue size: 3. Remaining tasks: 4
Adding item: 4
add_items - Processing queue size: 4. Remaining tasks: 5
Adding item: 5
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 1
Adding item: 6
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 2
Adding item: 7
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 3
Adding item: 8
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Adding item: 9
Processing item: 4
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 5
process_items - Processing queue size: 4. Remaining tasks: 4
Processing item: 6
process_items - Processing queue size: 3. Remaining tasks: 3
Processing item: 7
process_items - Processing queue size: 2. Remaining tasks: 2
Processing item: 8
process_items - Processing queue size: 1. Remaining tasks: 1
Processing item: 9
process_items - Processing queue size: 0. Remaining tasks: 0

<AT THIS POINT THE PROGRAM HALTS WITH THE PROCESSING THREAD>

 

You will see that, with a consumer thread available for item extraction and processing, the producer thread has been able to go beyond the 5th insertion. The producer thread was able to insert the first 5 element quite fast (~10 items per second), but from then on, it had to settle for the pace fixed by the consumer thread (1 item per second), as the queue has been kept full at all times. All the items have been produced and consumed. The consumer thread was created using an infinite loop, and we must correct this, so the processing thread stops properly when all work is complete.

Before doing that, I want to introduce the concept of task.

Task Mechanics

If you observe the code of the process_items method, I added this line at the end of the 1-second processing code simulation sleep:

Python
processing_queue.task_done()

In this and the previous article, I also had printed the number of "unfinished_tasks". To understand why I am using the task_done call, we need to have another look at the Python documentation:

Quote:
Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get()used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

The main reason to use the Python queues is to allow multiple producers and consumers to insert/extract elements from the queue in a thread-safe manner. Producers insert elements, and consumers extract them. The consumers are supposed to perform a task with the items, process them.

The Python queue implementation provides some counter management, related to the amount of items inserted in the queue:

  • Item count: The actual item count when, as they are inserted and extracted, using put and get. The queue enforces the amount of items to meet the limit on number of elements (the queue maxsize), so it knows when the queue is empty or full.
  • Unfinished task count: Similar to the item count, but with a slight difference: when we insert an item in the queue, the number of unfinished tasks is incremented, but when an item is extracted from the queue, the number of unfinished tasks is left unchanged. It will only decrease by one when a call Queue.task_finished is made. This forces us to ensure that in our code the consumer thread calls the task_finished method once per extracted element, when the processing task is done with the item, and not before. Make also sure that no uncaught exception jumps over that call to this method. The maximum value of unfinished tasks can be calculated as:
    Quote:

    Max number of unfinished tasks = ("the queue maxsize value") + min("the number of threads extracting and processing elements", "the queue maxsize value")

In our example, we had 5 items inserted by the producer thread, and 1 consumer thread extracting items, leaving us with 5+min(1,5) = 6 as the maximum of unfinished tasks, in accordance to the logs shown earlier.

The Python queue implementation uses locking thread mechanisms to enforce exclusive access to the counters, and that the count information is modified in a thread-safe manner. The same lock object is used for the get, put and task_done methods. From our own code, we need to inform the queue class that a task (the processing of an item in the consumer thread) is finished with a call to Queue.task_done(). Implemented like this in Python, we can use the queue object as a centralised place to communicate between threads, including the producer threads, the consumer threads, and the threads willing to join to the completion of the tasks in the queue.

Stopping the Consumer Thread by Checking the Number of Unfinished Tasks

Checking the Number of Unfinished Tasks

The simplest solution to stop the consumer thread loop when all the tasks in the queue have been completed is to use Queue.unfinished_tasks. If the value of this property of the queue object is greater than zero, that means that there are tasks still to be completed, and we should still try to extract items from the queue.

We will modify the code in the process_items method, to stop the loop when it detects that the queue is done with all tasks:

Python
def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))

This way, the resulting code does what we expect:

Quote:

Adding item: 0
Processing item: 0
add_items - Processing queue size: 0. Remaining tasks: 1
Adding item: 1
add_items - Processing queue size: 1. Remaining tasks: 2
Adding item: 2
add_items - Processing queue size: 2. Remaining tasks: 3
Adding item: 3
add_items - Processing queue size: 3. Remaining tasks: 4
Adding item: 4
add_items - Processing queue size: 4. Remaining tasks: 5
Adding item: 5
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 1
Adding item: 6
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 2
Adding item: 7
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 3
Adding item: 8
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 4
Adding item: 9
add_items - Processing queue size: 5. Remaining tasks: 6
process_items - Processing queue size: 5. Remaining tasks: 5
Processing item: 5
process_items - Processing queue size: 4. Remaining tasks: 4
Processing item: 6
process_items - Processing queue size: 3. Remaining tasks: 3
Processing item: 7
process_items - Processing queue size: 2. Remaining tasks: 2
Processing item: 8
process_items - Processing queue size: 1. Remaining tasks: 1
Processing item: 9
process_items - Processing queue size: 0. Remaining tasks: 0

<THE PROGRAM FINISHES PROPERLY NOW, NO HALT DUE TO THE INFINITE LOOP>

This code seems to work.

The Number of Unfinished Tasks Goes to Zero if the Producer Is Too Slow

At first sight, checking for the number of unfinished tasks for a consumer thread to know when all work is done sounds like a good idea. There are cases, though, where the consumer thread will stop prematurely if the producer thread does not keep the queue with items at all times. If the item production rate is much slower than the consumer's processing rate, the consumer thread will see an empty queue at some point, and a unfinished_tasks value of zero. Relative insertion/extraction rates affect the way unfinished task count reaches zero, making this value not enough to mark completion.

Let's see this happening. I will modify the sleep periods in the loops of the producer and consumer threads: I will make the producer thread produce items every 1 seconds, and make the consumer much faster, as to process an item in 0.1 seconds. I will add some more logging, to see exactly when the threads stop:

Python
import queue
import threading
import time

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        time.sleep(1)
        print("add_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
    print("add_items - Exiting thread")

def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(0.1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
    print("process_items - Exiting thread")

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    print("main - Finished")
    

if __name__ == "__main__":
    main()

Executing this code results in:

Quote:

Adding item: 0
Processing item: 0
process_items - Processing queue size: 0. Remaining tasks: 0
process_items - Exiting thread
add_items - Processing queue size: 0. Remaining tasks: 0
Adding item: 1
add_items - Processing queue size: 1. Remaining tasks: 1
Adding item: 2
add_items - Processing queue size: 2. Remaining tasks: 2
Adding item: 3
add_items - Processing queue size: 3. Remaining tasks: 3
Adding item: 4
add_items - Processing queue size: 4. Remaining tasks: 4
Adding item: 5
add_items - Processing queue size: 5. Remaining tasks: 5

<AT THIS POINT THE PROGRAM HALTS WITH THE PRODUCER THREAD WAITING FOR A FREE SPOT IN THE QUEUE>

The consumer thread exits after the first item gets processed. As the unfinished_tasks count went to zero and stayed like that for one second, before the producer had a chance to insert a new element. At that moment, the consumer assumes that there are no more items, and then it exists, stopping the item extraction too. The producer thread cannot insert more elements after the 5th element, because the queue has maxsize 5, and the program halts indefinitely.

Using Queue.join() in main() to Wait for Queue Completion

You might wonder why I did not use the Queue.join() facility provided by the Python implementation of queues, and I used Thread.join() instead, in the main thread.

From the documentation:

Quote:
Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

From the implementation of Queue.join():

Python
def join(self):
        '''Blocks until all items in the Queue have been gotten and processed.
        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.
        When the count of unfinished tasks drops to zero, join() unblocks.
        '''
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

So it waits until the unfinished task count reaches zero. If we substitute the thread joins with the queue join in the main() method in our code, we have:

Python
def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread")
    t_process.start()

    # Replaced this with Queue.join()
    # t_add.join()
    # t_process.join()
    # Replaced this with Queue.join()
    test_queue.join() # New
    print("main - Finished")

Executing the program now, we will see that the main thread will exit prematurely too when the unfinished task count reaches zero, as the consumer thread did in the example before, due to the count of unfinished tasks reaching zero (in turn due to the slow generation of item in the producer thread) in the initial stages of the execution. The execution of the program produces the following log:

Quote:

Adding item: 0
Processing item: 0
process_items - Processing queue size: 0. Remaining tasks: 0
main - Finished
process_items - Exiting thread

add_items - Processing queue size: 0. Remaining tasks: 0
Adding item: 1
add_items - Processing queue size: 1. Remaining tasks: 1
Adding item: 2
add_items - Processing queue size: 2. Remaining tasks: 2
Adding item: 3
add_items - Processing queue size: 3. Remaining tasks: 3
Adding item: 4
add_items - Processing queue size: 4. Remaining tasks: 4
Adding item: 5
add_items - Processing queue size: 5. Remaining tasks: 5

<AT THIS POINT THE PROGRAM HALTS WITH THE PRODUCER THREAD WAITING FOR A FREE SPOT IN THE QUEUE>

The main thread does not wait for the running threads to finish if we use Queue.join(). Both Queue.join() and Queue.unfinished_tasks are brittle indicators of completion, as the number of unfinished tasks is dependant on relative production/consumption speeds, and cannot be deemed as good enough indicator of completion, in the general case.

Using Daemonic Threads to Avoid Halting

We could try to avoid the thread halting issues using daemonic threads. This is part of the example part of the Queue class documentation. This way, when the main thread finishes, all other daemonic threads are stopped. I do not recommend using them in the general case, if you plan to use resources (system handles) from the computer in the threads accessing the queue. I'll explain why now.

From the documentation:

Quote:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If not None, daemon explicitly sets whether the thread is daemonic. If None (the default), the daemonic property is inherited from the current thread.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread

Also:

Quote:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument.

Note

Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

There is a “main thread” object; this corresponds to the initial thread of control in the Python program. It is not a daemon thread.

There is the possibility that “dummy thread objects” are created. These are thread objects corresponding to “alien threads”, which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be join()ed. They are never deleted, since it is impossible to detect the termination of alien threads.

This last bit of documentation is telling us that, if we use daemon threads, we will get all our threads interrupted and all the resources could be left open, or in an inconsistent state. The code from the Python documentation example should be modified when you are dealing with resources in the threads, so you should avoid using daemonic threads. The documentation gives us the hint to use an Event to stop the threads properly, instead.

To make a thread non-daemonic, you should specify daemon=False during the thread creation. Otherwise, the created thread will be daemonic.

As an example of how it will run with daemonic threads, we can test with a modified version of the main() method:

Python
def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread", daemon=True)
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread", daemon=True)
    t_process.start()

    # Replaced this with Queue.join()
    # t_add.join()
    # t_process.join()
    # Replaced this with Queue.join()
    test_queue.join() # New
    print("main - Finished")

Resulting logs:

Quote:

Adding item: 0
Processing item: 0
process_items - Processing queue size: 0. Remaining tasks: 0
main - Finished
process_items - Exiting thread

<THE PROGRAM EXITED, KILLING ALL THREADS>

The program exits directly after the first item is processed. The consumer thread finished properly, but we did not see the print of the producer's "Exiting thread". The code managing resources in this thread might have not run properly to free the resources and might .

What We Have Found Out So Far

After all the changes in the code, and the shortcomings we have found in our program in this and the previous article, I think it makes sense to enumerate the things we need to take into consideration for the implementation of producer/consumer applications with queues and threads.

  • Our code using queues should be independent on the relative speed of item generation/consumption. Producers should be allowed to produce element at any rate, regardless of the consumption rate, and the consumers should be able to detect the completion properly. The items to process could need time to be generated, or be extracted from a database, or a remote HTTP server. The consumer threads should be allowed to process items slowly, or very fast without affecting the detection of the completion status.
  • Our code using queues should avoid daemonic threads. Daemonic threads produce issues with open external resources (files, memory, sockets, etc.).
  • The application should wait for completion before exiting, or at least, wait for the producer thread to stop producing items: We will use the Event class to handle synchronisation between threads (to mark thread completion, etc.) We will avoid using Queue.join() in the main() method as it is implemented in Python (we can still override the join method in our own queue implementation).

Recap

After adding a consumer thread, we realised that the Python implementation of queue provides some of the required methods and values to detect completion, but that they do not suffice, and we will need to add Python events. The use of daemonic threads would solve some of the problems, but could lead to more if our queue program manages system resources, so we will avoid using them. In my next article in the series on queues, I will discuss how to synchronise and stop the producer and consumer threads acting on queues in a safe way, without the use of daemon threads, and follow the considerations learnt.

In my next article we will continue adding improvements to sort out the issues found.

History

  • 15th June, 2020: Initial version
  • 21th August, 2020: Added link to next article in recap

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)