In this article, I would like to explain from the start how I use the Python queues in a concurrent execution setting in a discovery fashion, starting with basic Python Queues, and then adding complexity to solve specific requirements.
Introduction
This is the first in a series of Python articles explaining the purpose of the Python Queue class, and its uses in a real project.
The Queue Data Structure
The queue data structure that stores elements in a linear sequence and allows 2 operations: push
and pop
.
push
inserts an element in the queue, at the end of the queue (last element). pop
removes the first element of the queue and returns it to the caller.
You can start with an empty queue and push
2 elements (first elem1
and then elem2
), then pop
twice to get the two elements. The Queue
structure accesses the elements is a FIFO sequence (first-in-first-out). In theory, the structure can contain infinite elements, but of course, there will be a memory limitation, so usually, we will set limits to the maxsize
of the queues we create.
The Python Queue Implementation
Funnily enough, Python implements the queue data structure (FIFO sequence), the stack data structure (LIFO sequence), and the priority queue data structure (sequence with ordering depending on parameter and comparison rule on push), with the same generic name (queue), and in the same package.
From the documentation on the queue constructor:
Quote:
- class
queue.Queue
(maxsize=0) -
Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
- class
queue.LifoQueue
(maxsize=0) -
Constructor for a LIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
- class
queue.PriorityQueue
(maxsize=0) -
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]
). A typical pattern for entries is a tuple in the form: (priority_number, data)
.
I will focus on the queue data structure for the moment (FIFO sequence).
What Can I Use Queues For?
I am specially interested in using additional properties of the Python implementation of the queue data structures:
- They are thread-safe. Multiple threads could try to access the queue at the same time (e.g., pop an element in the queue). This property means that the queue will ensure that only one thread will acquire the queue for the operation at any single time. The calling thread can perform a pop/push of the element in the queue blocking, and with a timeout, if needed. This simplifies implementation of complex workflows, as we will not need to code synchronisation primitives to deal with concurrent execution.
- They have a maxsize: we can limit the throughput of the inserting threads specifying the maximum number of elements of the queue to a value N (N>0). The thread attempting to insert the element N+1 in a queue of maxsize N (queue full situation) will be blocked or fail, depending on the flavour of the insert.
Together with concurrency, I am interested in using the Python queues to:
- Decouple processing. From a producer thread, I can queue items for processing by a separate task in a different thread, and continue with the producer thread activities without the need to wait for the processing task to finish.
- Serialise processing. I can have several threads producing elements to be processed, and have them queued to be processed.
In both cases, the queue can act as a buffer of limited size (maxsize) for processing.
Queue Code Examples
Method to Add an Element to a Queue
From the Python documentation:
Quote:
Queue.put
(item, block=True, timeout=None) -
Put item into the queue. If optional args block is true and timeout is None
(the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full
exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full
exception (timeout is ignored in that case).
A method to insert without blocking exists:
Queue.put_nowait(x)
this call is equivalent to...
Queue.put(x,block=False)
Example 1: Adding Elements to a Queue, Non-Blocking
This is an example code to add elements to a queue.
The main()
function contains the logic of the program. A queue is created (test_queue
), with maxsize
5, limiting the number of elements that can be inserted into it.
I created a named thread to insert elements in a loop. The thread method add_items
receives two parameters: the queue to insert into, and the total amount of elements to insert (index in the loop). For this test, the elements are integer numbers, but they could have been any Python type. I introduce threads already in the code, because I intend to show the interactions between thread-safe queues and threads form the start. I also plan to extend the implementation with other facilities requiring use of threads.
The code starts the thread for item insertion, and we join the thread, to wait for its completion, before exiting the program. The thread starts filling the queue (test_queue
), using put
calls (Python naming for the push
operation) with 100 elements to be inserted. By default, the put
method uses inserting thread blocking and with infinite timeout. Using the parameters timeout
and block
, this can be changed.
import queue
import threading
import time
def add_items(processing_queue, num_items):
for x in range(num_items):
time.sleep(.1)
processing_queue.put(x)
print("Adding item: {}".format(x))
print("Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
def main():
test_queue = queue.Queue(5)
t_add = threading.Thread(target=add_items, args=(test_queue, 100), name="add_items_thread")
t_add.start()
t_add.join()
if __name__ == "__main__":
tick = time.time()
main()
print("Execution time: {} seconds.".format(round(time.time() - tick, 2)))
After each insert, I print the size of the queue (number of elements in it) and the number of unfinished tasks (we will dig into this number later on in this series) The size of the queue in this example is five elements. In my computer, the execution results in this sequence of logs:
Quote:
Adding item: 0
Processing queue size: 1. Remaining tasks: 1
Adding item: 1
Processing queue size: 2. Remaining tasks: 2
Adding item: 2
Processing queue size: 3. Remaining tasks: 3
Adding item: 3
Processing queue size: 4. Remaining tasks: 4
Adding item: 4
Processing queue size: 5. Remaining tasks: 5
As you can see, after inserting five element, the queue is full. The put
s were blocking, with infinite timeout. As nobody is extracting items from the queue, the next put
is blocked, waiting for a chance to insert the next element.
Example 2: Adding Elements to a Queue, Non-Blocking
If we made the put
non-blocking:
def add_items(processing_queue, num_items):
for x in range(num_items):
time.sleep(.1)
processing_queue.put(x, block=False)
print("Adding item: {}".format(x))
print("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
... we would have had a queue.Full
Exception shown:
Quote:
Adding item: 0
Processing queue size: 1. Remaining tasks: 1
Adding item: 1
Processing queue size: 2. Remaining tasks: 2
Adding item: 2
Processing queue size: 3. Remaining tasks: 3
Adding item: 3
Processing queue size: 4. Remaining tasks: 4
Adding item: 4
Processing queue size: 5. Remaining tasks: 5
Exception in thread add_items_thread:
Traceback (most recent call last):
File "C:\dev\python36\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\dev\python36\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Documents\Personal\Sources\PythonScripts\Blog\Python\Queues\Article 1\Queues - Article 01.py", line 12, in AddItems
processing_queue.put(x, block=False)
File "C:\dev\python36\lib\queue.py", line 130, in put
raise Full
queue.Full
Execution time: 6.1 seconds.
Recap
I created two examples of code for insertion in the queue with put
: one with blocking enabled (default), and another one without blocking, to see the different results. As we do not have extraction of elements from the queue, this results in.
- Blocking put: the queue blocks after 5 insertions, because it has
maxsize
5. - Non-blocking put: the code throws an exception (
queue.Full
) when surpassing the limit of five elements of the queue.
What Now?
We started very slow, as inserting elements in a queue is a simple task, but things will start to get hairy when we include additional threads and functionalities to our code. We will continue with my next article in the series, adding a processing thread that will extract elements from the queue, and we will see what happens with the number of elements in the queue and the number of unfinished tasks.
History
- 4th June, 2020: Initial version