Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Python3.6

Discovering Python Step by Step - Using Python Queues - 01 - Adding Elements to a Queue

4.17/5 (4 votes)
18 Jun 2020CPOL7 min read 13.2K  
How to add elements to a queue in Python
In this article, I would like to explain from the start how I use the Python queues in a concurrent execution setting in a discovery fashion, starting with basic Python Queues, and then adding complexity to solve specific requirements.

Introduction

This is the first in a series of Python articles explaining the purpose of the Python Queue class, and its uses in a real project.

The Queue Data Structure

The queue data structure that stores elements in a linear sequence and allows 2 operations: push and pop.

  • push inserts an element in the queue, at the end of the queue (last element).
  • pop removes the first element of the queue and returns it to the caller.

You can start with an empty queue and push 2 elements (first elem1 and then elem2), then pop twice to get the two elements. The Queue structure accesses the elements is a FIFO sequence (first-in-first-out). In theory, the structure can contain infinite elements, but of course, there will be a memory limitation, so usually, we will set limits to the maxsize of the queues we create.

The Python Queue Implementation

Funnily enough, Python implements the queue data structure (FIFO sequence), the stack data structure (LIFO sequence), and the priority queue data structure (sequence with ordering depending on parameter and comparison rule on push), with the same generic name (queue), and in the same package.

From the documentation on the queue constructor:

Quote:
class queue.Queue(maxsize=0)

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class queue.LifoQueue(maxsize=0)

Constructor for a LIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class queue.PriorityQueue(maxsize=0)

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

I will focus on the queue data structure for the moment (FIFO sequence).

What Can I Use Queues For?

I am specially interested in using additional properties of the Python implementation of the queue data structures:

  • They are thread-safe. Multiple threads could try to access the queue at the same time (e.g., pop an element in the queue). This property means that the queue will ensure that only one thread will acquire the queue for the operation at any single time. The calling thread can perform a pop/push of the element in the queue blocking, and with a timeout, if needed. This simplifies implementation of complex workflows, as we will not need to code synchronisation primitives to deal with concurrent execution.
  • They have a maxsize: we can limit the throughput of the inserting threads specifying the maximum number of elements of the queue to a value N (N>0). The thread attempting to insert the element N+1 in a queue of maxsize N (queue full situation) will be blocked or fail, depending on the flavour of the insert.

Together with concurrency, I am interested in using the Python queues to:

  • Decouple processing. From a producer thread, I can queue items for processing by a separate task in a different thread, and continue with the producer thread activities without the need to wait for the processing task to finish.
  • Serialise processing. I can have several threads producing elements to be processed, and have them queued to be processed.

In both cases, the queue can act as a buffer of limited size (maxsize) for processing.

Queue Code Examples

Method to Add an Element to a Queue

From the Python documentation:

Quote:
Queue.put(item, block=True, timeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

A method to insert without blocking exists:

Python
Queue.put_nowait(x)

this call is equivalent to...

Python
Queue.put(x,block=False)

Example 1: Adding Elements to a Queue, Non-Blocking

This is an example code to add elements to a queue.

The main() function contains the logic of the program. A queue is created (test_queue), with maxsize 5, limiting the number of elements that can be inserted into it.

I created a named thread to insert elements in a loop. The thread method add_items receives two parameters: the queue to insert into, and the total amount of elements to insert (index in the loop). For this test, the elements are integer numbers, but they could have been any Python type. I introduce threads already in the code, because I intend to show the interactions between thread-safe queues and threads form the start. I also plan to extend the implementation with other facilities requiring use of threads.

The code starts the thread for item insertion, and we join the thread, to wait for its completion, before exiting the program. The thread starts filling the queue (test_queue), using put calls (Python naming for the push operation) with 100 elements to be inserted. By default, the put method uses inserting thread blocking and with infinite timeout. Using the parameters timeout and block, this can be changed.

Python
import queue
import threading
import time

# Discovering Python step by step - Using Python Queues - 01 - Queues - Sample 01

def add_items(processing_queue, num_items):
    for x in range(num_items):
        time.sleep(.1)
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        print("Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def main():
    test_queue = queue.Queue(5)
    t_add = threading.Thread(target=add_items, args=(test_queue, 100), name="add_items_thread")
    t_add.start()
    t_add.join()

if __name__ == "__main__":
    tick =  time.time()
    main()
    print("Execution time: {} seconds.".format(round(time.time() - tick, 2)))

After each insert, I print the size of the queue (number of elements in it) and the number of unfinished tasks (we will dig into this number later on in this series) The size of the queue in this example is five elements. In my computer, the execution results in this sequence of logs:

Quote:

Adding item: 0
Processing queue size: 1. Remaining tasks: 1
Adding item: 1
Processing queue size: 2. Remaining tasks: 2
Adding item: 2
Processing queue size: 3. Remaining tasks: 3
Adding item: 3
Processing queue size: 4. Remaining tasks: 4
Adding item: 4
Processing queue size: 5. Remaining tasks: 5

As you can see, after inserting five element, the queue is full. The puts were blocking, with infinite timeout. As nobody is extracting items from the queue, the next put is blocked, waiting for a chance to insert the next element.

Example 2: Adding Elements to a Queue, Non-Blocking

If we made the put non-blocking:

Python
def add_items(processing_queue, num_items):
    for x in range(num_items):
        time.sleep(.1)
        processing_queue.put(x, block=False)
        print("Adding item: {}".format(x))
        print("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))

... we would have had a queue.Full Exception shown:

Quote:

Adding item: 0
Processing queue size: 1. Remaining tasks: 1
Adding item: 1
Processing queue size: 2. Remaining tasks: 2
Adding item: 2
Processing queue size: 3. Remaining tasks: 3
Adding item: 3
Processing queue size: 4. Remaining tasks: 4
Adding item: 4
Processing queue size: 5. Remaining tasks: 5
Exception in thread add_items_thread:
Traceback (most recent call last):
File "C:\dev\python36\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\dev\python36\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Documents\Personal\Sources\PythonScripts\Blog\Python\Queues\Article 1\Queues - Article 01.py", line 12, in AddItems
processing_queue.put(x, block=False)
File "C:\dev\python36\lib\queue.py", line 130, in put
raise Full
queue.Full

Execution time: 6.1 seconds.

Recap

I created two examples of code for insertion in the queue with put: one with blocking enabled (default), and another one without blocking, to see the different results. As we do not have extraction of elements from the queue, this results in.

  • Blocking put: the queue blocks after 5 insertions, because it has maxsize 5.
  • Non-blocking put: the code throws an exception (queue.Full) when surpassing the limit of five elements of the queue.

What Now?

We started very slow, as inserting elements in a queue is a simple task, but things will start to get hairy when we include additional threads and functionalities to our code. We will continue with my next article in the series, adding a processing thread that will extract elements from the queue, and we will see what happens with the number of elements in the queue and the number of unfinished tasks.

History

  • 4th June, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)