Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Python3.6

Discovering Python step by step - Using Python Queues - 03 - Detecting Queue Completion Properly

5.00/5 (3 votes)
20 Oct 2020CPOL13 min read 10.7K   80  
How to detect queue completion from other threads properly when using Python queues.
The main function and the processor threads using Python queues require use of Events to detect completion. I will show how here.

Background

In my previous article about queues, we discovered that we would need more than the provided functionalities in the Python implementation of queues for the general case. In the process, I listed the minimal requirements for our program. Listed here again:

Our application...

  • Should be independent on the relative speed of item generation/consumption. Producers should be allowed to produce element at any rate, regardless of the consumption rate, and the consumers should be able to detect the completion properly. The items to process could need time to be generated, or be extracted from a database, or a remote HTTP server. The consumer threads should be allowed to process items slowly, or very fast without affecting the detection of the completion status.
  • Should not use daemonic threads. Daemonic threads produce issues with open external resources (files, memory, sockets, etc.).
  • The application should wait for completion before exiting, or at least, wait for the producer thread to stop producing items: We will use the Event class to handle synchronisation between threads (to mark thread completion, etc.). We will avoid using Queue.join() in the main() method as it is implemented in Python (we can still override the join method in our own queue implementation).

Example 1: Code from Last Article

We will start with the following code:

Python
import queue
import threading
import time

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        time.sleep(1)
        print("add_items - Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))
    print("add_items - Exiting thread")

def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(0.1)
        processing_queue.task_done()
        print("Item: {} processed.".format(x))
        print("process_items - Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))
    print("process_items - Exiting thread")

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread
    t_process = threading.Thread(target=process_items, 
                args=(test_queue,), name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    print("main - Exiting thread")   

if __name__ == "__main__":
    main()

This is the code that stalled, because the consumer thread exited when the number of unfinished tasks reach zero (too early, no completion). Due to this, no other item was extracted from the queue. Note that I reverted the code back to the use of thread joins, instead of using Queue.join().

Quote:

Adding item: 0
Processing item: 0
Item: 0 processed.
process_items - Processing queue size: 0. Remaining tasks: 0
process_items - Exiting thread
add_items - Processing queue size: 0. Remaining tasks: 0
Adding item: 1
add_items - Processing queue size: 1. Remaining tasks: 1
Adding item: 2
add_items - Processing queue size: 2. Remaining tasks: 2
Adding item: 3
add_items - Processing queue size: 3. Remaining tasks: 3
Adding item: 4
add_items - Processing queue size: 4. Remaining tasks: 4
Adding item: 5
add_items - Processing queue size: 5. Remaining tasks: 5

<AT THIS POINT THE PROGRAM HALTS WITH THE PRODUCER THREAD WAITING FOR A FREE SPOT IN THE QUEUE>

This code needs a couple of fixes. I will improve the logging, and add an event to notify the other threads when the producer thread is finished producing items.

Example 2: Adding Proper Logging

I added some improvements in the way things are logged, so we can follow the sequence better. First, we use the logging standard Python library. I added a configuration method configure_logging that will be called at the beginning of the program, setting the format of the logs, to include the time, up to the millisecond, and the thread name. I set the lowest log level to INFO, for the moment.

Python
import queue
import threading
import time
import logging

def configure_logging():
    log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
    datefmt='%Y-%m-%d %H:%M:%S'
    logging.basicConfig(level=logging.INFO,
                        format=log_format,
                        datefmt=datefmt,
                        )

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        logging.info("Adding item: {}".format(x))
        time.sleep(1)
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))
    logging.info("Exiting thread")

def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        logging.info("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(0.1)
        processing_queue.task_done()
        logging.info("Item: {} processed.".format(x))
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))
    logging.info("Exiting thread")

def main():
    configure_logging()
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), 
                                 name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    logging.info("Exiting thread")
   

if __name__ == "__main__":
    main()

Result:

Quote:

2020-06-22 18:56:34.797|add_items_thread |Adding item: 0
2020-06-22 18:56:34.798|process_items_thread |Processing item: 0
2020-06-22 18:56:34.899|process_items_thread |Item: 0 processed.
2020-06-22 18:56:34.903|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:56:34.905|process_items_thread |Exiting thread
2020-06-22 18:56:35.799|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:56:35.799|add_items_thread |Adding item: 1
2020-06-22 18:56:36.801|add_items_thread |Processing queue size: 1. Remaining tasks: 1
2020-06-22 18:56:36.805|add_items_thread |Adding item: 2
2020-06-22 18:56:37.807|add_items_thread |Processing queue size: 2. Remaining tasks: 2
2020-06-22 18:56:37.808|add_items_thread |Adding item: 3
2020-06-22 18:56:38.809|add_items_thread |Processing queue size: 3. Remaining tasks: 3
2020-06-22 18:56:38.810|add_items_thread |Adding item: 4
2020-06-22 18:56:39.811|add_items_thread |Processing queue size: 4. Remaining tasks: 4
2020-06-22 18:56:39.813|add_items_thread |Adding item: 5
2020-06-22 18:56:40.815|add_items_thread |Processing queue size: 5. Remaining tasks: 5

<AT THIS POINT THE PROGRAM HALTS WITH THE PRODUCER THREAD WAITING FOR A FREE SPOT IN THE QUEUE>

 

Example 3: Signaling the Stop of the Producer Thread with an Event

Now we will signal the end of the producer thread properly, using a threading.Event. When the add_items method is done, it signals finished production, using the event.set() call. The consumer thread should stop when the event is set.

Python
import queue
import threading
import time
import logging

def configure_logging():
    log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
    datefmt='%Y-%m-%d %H:%M:%S'
    logging.basicConfig(level=logging.INFO,
                        format=log_format,
                        datefmt=datefmt,
                        )

def add_items(processing_queue, num_items, queue_event):
    for x in range(num_items):
        processing_queue.put(x)
        logging.info("Adding item: {}".format(x))
        time.sleep(1)
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
                                    processing_queue.unfinished_tasks))
    queue_event.set()
    logging.info("Exiting thread")

def process_items(processing_queue, queue_event):
    while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        logging.info("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(0.1)
        processing_queue.task_done()
        logging.info("Item: {} processed.".format(x))
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
                                    processing_queue.unfinished_tasks))
    logging.info("Exiting thread")

def main():
    configure_logging()
    test_queue = queue.Queue(5)

    work_finished_event      = threading.Event()

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10, work_finished_event), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, 
                args=(test_queue, work_finished_event), name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    logging.info("Exiting thread")
   

if __name__ == "__main__":
    main()

The result of the execution is the following:

Quote:

2020-06-22 18:57:36.748|add_items_thread |Adding item: 0
2020-06-22 18:57:36.751|process_items_thread |Processing item: 0
2020-06-22 18:57:36.852|process_items_thread |Item: 0 processed.
2020-06-22 18:57:36.853|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:37.754|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:37.757|add_items_thread |Adding item: 1
2020-06-22 18:57:37.759|process_items_thread |Processing item: 1
2020-06-22 18:57:37.875|process_items_thread |Item: 1 processed.
2020-06-22 18:57:37.878|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:38.767|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:38.767|add_items_thread |Adding item: 2
2020-06-22 18:57:38.767|process_items_thread |Processing item: 2
2020-06-22 18:57:38.869|process_items_thread |Item: 2 processed.
2020-06-22 18:57:38.871|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:39.769|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:39.769|add_items_thread |Adding item: 3
2020-06-22 18:57:39.770|process_items_thread |Processing item: 3
2020-06-22 18:57:39.871|process_items_thread |Item: 3 processed.
2020-06-22 18:57:39.871|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:40.771|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:40.773|add_items_thread |Adding item: 4
2020-06-22 18:57:40.775|process_items_thread |Processing item: 4
2020-06-22 18:57:40.918|process_items_thread |Item: 4 processed.
2020-06-22 18:57:40.918|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:41.815|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:41.818|add_items_thread |Adding item: 5
2020-06-22 18:57:41.820|process_items_thread |Processing item: 5
2020-06-22 18:57:41.970|process_items_thread |Item: 5 processed.
2020-06-22 18:57:41.972|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:42.868|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:42.869|add_items_thread |Adding item: 6
2020-06-22 18:57:42.870|process_items_thread |Processing item: 6
2020-06-22 18:57:42.973|process_items_thread |Item: 6 processed.
2020-06-22 18:57:42.974|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:43.872|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:43.874|add_items_thread |Adding item: 7
2020-06-22 18:57:43.874|process_items_thread |Processing item: 7
2020-06-22 18:57:44.003|process_items_thread |Item: 7 processed.
2020-06-22 18:57:44.003|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:44.876|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:44.878|add_items_thread |Adding item: 8
2020-06-22 18:57:44.879|process_items_thread |Processing item: 8
2020-06-22 18:57:45.005|process_items_thread |Item: 8 processed.
2020-06-22 18:57:45.006|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:45.880|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:45.882|add_items_thread |Adding item: 9
2020-06-22 18:57:45.883|process_items_thread |Processing item: 9
2020-06-22 18:57:46.012|process_items_thread |Item: 9 processed.
2020-06-22 18:57:46.013|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:46.886|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:57:46.888|add_items_thread |Exiting thread

<AT THIS POINT THE PROGRAM HALTS WITH THE CONSUMER THREAD WAITING FOR A NEW ITEM TO BE QUEUED>

The consumer thread did not stop as we expected. This is due to the use of a blocking Queue.get() without timeout. When we processed the last element inserted into the queue (item 9), the consumer thread goes to the next iteration before the producer is able to mark the completion. To avoid this problem, we will need to modify the code to add a timeout to the blocking Queue.get().

Example 4: Adding a Timeout to the Blocking Extract, and Handling the queue.Empty Exception from Timeout

We enforce a timeout on the Queue.get(). If the queue is empty, it will trigger a queue.Empty exception. We surround the timed-out Queue.get() with a try/except block, to handle the queue.Empty exception. With the two changes, we should have the hanging of the consumer threads finally fixed.

Python
import queue
import threading
import time
import logging

def configure_logging():
    log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
    datefmt='%Y-%m-%d %H:%M:%S'
    logging.basicConfig(level=logging.INFO,
                        format=log_format,
                        datefmt=datefmt,
                        )

def add_items(processing_queue, num_items, queue_event):
    for x in range(num_items):
        processing_queue.put(x)
        logging.info("Adding item: {}".format(x))
        time.sleep(1)
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
                                    processing_queue.unfinished_tasks))
    queue_event.set()
    logging.info("Exiting thread")

def process_items(processing_queue, queue_event):
    while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
        # Blocking get, timeout of 1 second
        try:
            x = processing_queue.get(timeout=1)
            # Processing happens here
            logging.info("Processing item: {}".format(x))
            # simulate processing time. Much slower than insertion
            time.sleep(0.1)
            processing_queue.task_done()
            logging.info("Item: {} processed.".format(x))
            logging.info("Processing queue size: {}. 
              Remaining tasks: {}".format(processing_queue.qsize(), 
                                          processing_queue.unfinished_tasks))
        except queue.Empty as e:
            logging.debug("Queue empty")
    logging.info("Exiting thread")

def main():
    configure_logging()
    test_queue = queue.Queue(5)

    work_finished_event      = threading.Event()

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10, work_finished_event), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, 
                args=(test_queue, work_finished_event), name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    logging.info("Exiting thread")
   

if __name__ == "__main__":
    main()

Executing the program again results in the following:

Quote:

2020-06-22 18:59:42.371|add_items_thread |Adding item: 0
2020-06-22 18:59:42.376|process_items_thread |Processing item: 0
2020-06-22 18:59:42.478|process_items_thread |Item: 0 processed.
2020-06-22 18:59:42.478|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:43.375|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:43.377|add_items_thread |Adding item: 1
2020-06-22 18:59:43.378|process_items_thread |Processing item: 1
2020-06-22 18:59:43.483|process_items_thread |Item: 1 processed.
2020-06-22 18:59:43.484|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:44.380|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:44.382|add_items_thread |Adding item: 2
2020-06-22 18:59:44.384|process_items_thread |Processing item: 2
2020-06-22 18:59:44.486|process_items_thread |Item: 2 processed.
2020-06-22 18:59:44.487|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:45.386|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:45.388|add_items_thread |Adding item: 3
2020-06-22 18:59:45.389|process_items_thread |Processing item: 3
2020-06-22 18:59:45.501|process_items_thread |Item: 3 processed.
2020-06-22 18:59:45.502|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:46.391|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:46.393|add_items_thread |Adding item: 4
2020-06-22 18:59:46.393|process_items_thread |Processing item: 4
2020-06-22 18:59:46.516|process_items_thread |Item: 4 processed.
2020-06-22 18:59:46.516|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:47.395|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:47.398|add_items_thread |Adding item: 5
2020-06-22 18:59:47.399|process_items_thread |Processing item: 5
2020-06-22 18:59:47.520|process_items_thread |Item: 5 processed.
2020-06-22 18:59:47.520|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:48.419|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:48.419|add_items_thread |Adding item: 6
2020-06-22 18:59:48.419|process_items_thread |Processing item: 6
2020-06-22 18:59:48.522|process_items_thread |Item: 6 processed.
2020-06-22 18:59:48.523|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:49.421|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:49.424|add_items_thread |Adding item: 7
2020-06-22 18:59:49.427|process_items_thread |Processing item: 7
2020-06-22 18:59:49.568|process_items_thread |Item: 7 processed.
2020-06-22 18:59:49.568|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:50.467|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:50.468|add_items_thread |Adding item: 8
2020-06-22 18:59:50.468|process_items_thread |Processing item: 8
2020-06-22 18:59:50.570|process_items_thread |Item: 8 processed.
2020-06-22 18:59:50.571|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:51.484|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:51.496|add_items_thread |Adding item: 9
2020-06-22 18:59:51.497|process_items_thread |Processing item: 9
2020-06-22 18:59:51.621|process_items_thread |Item: 9 processed.
2020-06-22 18:59:51.621|process_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:52.499|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 18:59:52.500|add_items_thread |Exiting thread
2020-06-22 18:59:52.622|process_items_thread |Queue empty
2020-06-22 18:59:52.622|process_items_thread |Exiting thread

2020-06-22 18:59:52.623|MainThread |Exiting thread

<AT THIS POINT THE PROGRAM STOPS PROPERLY. PRODUCER AND CONSUMER THREADS COMPLETE>

As you can see, the consumer thread now throws an exception due to the queue being empty. This exception now is intercepted, and the consumer thread goes for the next iteration. This time, the program stops all threads properly!!!

I might need to add more consumer threads to the general solution. Let's see how to do that.

Example 5: Adding Consumer Threads, Adding Thread List

I changed the code to have multiple consumer threads. I created a thread list, with the threads that the program need to wait for before exitting. We will have now 1 producer thread and 5 consumer threads. I made the max size of the queue equal to the number of threads to have all consumer threads working all the time. The number of consumer threads fixes the rate of processing. We join to all threads, starting with the producer thread (it's first in the list). The consumer thread names are numerated now, so we know which thread is doing what.

Python
import queue
import threading
import time
import logging

def configure_logging():
    log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
    datefmt='%Y-%m-%d %H:%M:%S'
    logging.basicConfig(level=logging.INFO,
                        format=log_format,
                        datefmt=datefmt,
                        )

def add_items(processing_queue, num_items, queue_event):
    for x in range(num_items):
        processing_queue.put(x)
        logging.info("Adding item: {}".format(x))
        time.sleep(1)
        logging.info("Processing queue size: {}. Remaining tasks: {}".format
                    (processing_queue.qsize(), processing_queue.unfinished_tasks))
    queue_event.set()
    logging.info("Exiting thread")

def process_items(processing_queue, queue_event):
    while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
        # Blocking get, timeout of 1 second
        try:
            x = processing_queue.get(timeout=1)
            # Processing happens here
            logging.info("Processing item: {}".format(x))
            # simulate processing time. Much slower than insertion
            time.sleep(0.1)
            processing_queue.task_done()
            logging.info("Item: {} processed.".format(x))
            logging.info("Processing queue size: {}. Remaining tasks: {}".format
                        (processing_queue.qsize(), processing_queue.unfinished_tasks))
        except queue.Empty as e:
            logging.debug("Queue empty")
    logging.info("Exiting thread")

def main():
    max_queue_size = 5
    configure_logging()
    test_queue = queue.Queue(max_queue_size)

    work_finished_event      = threading.Event()
    thread_list = list()

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10, work_finished_event), name="add_items_thread")
    t_add.start()
    thread_list.append(t_add)

    # processing thread (new)
    for i in range(max_queue_size):
        t_process = threading.Thread(target=process_items, 
        args=(test_queue, work_finished_event), 
        name="process_items_thread_{0:03d}".format(i))
        t_process.start()
        thread_list.append(t_process)

    for t in thread_list:
        t.join()

    logging.info("Exiting thread")

if __name__ == "__main__":
    main()

The result of the execution shows:

Quote:

2020-06-22 19:00:27.598|add_items_thread |Adding item: 0
2020-06-22 19:00:27.600|process_items_thread_000 |Processing item: 0
2020-06-22 19:00:27.703|process_items_thread_000 |Item: 0 processed.
2020-06-22 19:00:27.704|process_items_thread_000 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:28.603|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:28.604|add_items_thread |Adding item: 1
2020-06-22 19:00:28.605|process_items_thread_002 |Processing item: 1
2020-06-22 19:00:28.709|process_items_thread_002 |Item: 1 processed.
2020-06-22 19:00:28.709|process_items_thread_002 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:29.606|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:29.606|add_items_thread |Adding item: 2
2020-06-22 19:00:29.607|process_items_thread_004 |Processing item: 2
2020-06-22 19:00:29.709|process_items_thread_004 |Item: 2 processed.
2020-06-22 19:00:29.712|process_items_thread_004 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:30.608|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:30.610|add_items_thread |Adding item: 3
2020-06-22 19:00:30.611|process_items_thread_000 |Processing item: 3
2020-06-22 19:00:30.715|process_items_thread_000 |Item: 3 processed.
2020-06-22 19:00:30.716|process_items_thread_000 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:31.613|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:31.615|add_items_thread |Adding item: 4
2020-06-22 19:00:31.616|process_items_thread_002 |Processing item: 4
2020-06-22 19:00:31.752|process_items_thread_002 |Item: 4 processed.
2020-06-22 19:00:31.752|process_items_thread_002 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:32.617|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:32.617|add_items_thread |Adding item: 5
2020-06-22 19:00:32.618|process_items_thread_004 |Processing item: 5
2020-06-22 19:00:32.720|process_items_thread_004 |Item: 5 processed.
2020-06-22 19:00:32.721|process_items_thread_004 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:33.619|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:33.619|add_items_thread |Adding item: 6
2020-06-22 19:00:33.620|process_items_thread_000 |Processing item: 6
2020-06-22 19:00:33.722|process_items_thread_000 |Item: 6 processed.
2020-06-22 19:00:33.723|process_items_thread_000 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:34.621|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:34.623|add_items_thread |Adding item: 7
2020-06-22 19:00:34.630|process_items_thread_004 |Processing item: 7
2020-06-22 19:00:34.759|process_items_thread_004 |Item: 7 processed.
2020-06-22 19:00:34.763|process_items_thread_004 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:35.656|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:35.661|add_items_thread |Adding item: 8
2020-06-22 19:00:35.663|process_items_thread_000 |Processing item: 8
2020-06-22 19:00:35.823|process_items_thread_000 |Item: 8 processed.
2020-06-22 19:00:35.828|process_items_thread_000 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:36.709|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:36.712|add_items_thread |Adding item: 9
2020-06-22 19:00:36.713|process_items_thread_002 |Processing item: 9
2020-06-22 19:00:36.852|process_items_thread_002 |Item: 9 processed.
2020-06-22 19:00:36.853|process_items_thread_002 |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:37.715|add_items_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:00:37.720|add_items_thread |Exiting thread
2020-06-22 19:00:37.767|process_items_thread_004 |Exiting thread
2020-06-22 19:00:37.832|process_items_thread_000 |Exiting thread
2020-06-22 19:00:37.854|process_items_thread_002 |Exiting thread
2020-06-22 19:00:38.605|process_items_thread_001 |Exiting thread
2020-06-22 19:00:38.611|process_items_thread_003 |Exiting thread

2020-06-22 19:00:38.681|MainThread |Exiting thread

<AT THIS POINT THE PROGRAM STOPS PROPERLY. PRODUCER AND CONSUMER THREADS COMPLETE>

As we are waiting for both the unfinished_tasks count to be zero and the producer completion Event to be set, all the consumer threads will keep trying until there are no tasks being executed and the completion event is set.

Example 6: Adding a Monitoring Thread

I add a monitoring thread to count the number of unfinished tasks and to clear a bit of the clutter on the consumer and producer threads. This thread will also wait for the completion event to be signaled. The monitoring thread will show the remaining task count every 5 seconds.

Python
import queue
import threading
import time
import logging

def configure_logging():
    log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
    datefmt='%Y-%m-%d %H:%M:%S'
    logging.basicConfig(level=logging.INFO,
                        format=log_format,
                        datefmt=datefmt,
                        )

def monitor_queue(processing_queue, queue_event):
    while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
        check_period = 5
        logging.info("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
                                    processing_queue.unfinished_tasks))
        time.sleep(check_period-divmod(time.time(), check_period)[1])
    logging.info("Exiting thread. 
                  Remaining tasks: {}".format(processing_queue.unfinished_tasks))

def add_items(processing_queue, num_items, queue_event):
    for x in range(num_items):
        processing_queue.put(x)
        logging.info("Adding item: {}".format(x))
        time.sleep(1)
    queue_event.set()
    logging.info("Exiting thread")

def process_items(processing_queue, queue_event):
    while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
        # Blocking get, timeout of 1 second
        try:
            x = processing_queue.get(timeout=1)
            # Processing happens here
            logging.info("Processing item: {}".format(x))
            # simulate processing time. Much slower than insertion
            time.sleep(0.1)
            processing_queue.task_done()
            logging.info("Item: {} processed.".format(x))
        except queue.Empty as e:
            logging.debug("Queue empty")
    logging.info("Exiting thread")

def main():
    max_queue_size = 5
    configure_logging()
    test_queue = queue.Queue(max_queue_size)

    work_finished_event      = threading.Event()
    thread_list = list()

    # monitoring thread
    t_monitor = threading.Thread(target=monitor_queue, 
                args=(test_queue, work_finished_event), name="monitor_queue_thread")
    t_monitor.start()
    thread_list.append(t_monitor)

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10, work_finished_event), name="add_items_thread")
    t_add.start()
    thread_list.append(t_add)

    # processing thread (new)
    for i in range(max_queue_size):
        t_process = threading.Thread(target=process_items, 
        args=(test_queue, work_finished_event), name="process_items_thread_{0:03d}".format(i))
        t_process.start()
        thread_list.append(t_process)

    for t in thread_list:
        t.join()   
    
    logging.info("Exiting thread")

if __name__ == "__main__":
    main()

Resulting logs:

Quote:

2020-06-22 19:01:25.566|monitor_queue_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:01:25.568|add_items_thread |Adding item: 0
2020-06-22 19:01:25.572|process_items_thread_000 |Processing item: 0
2020-06-22 19:01:25.673|process_items_thread_000 |Item: 0 processed.
2020-06-22 19:01:26.573|add_items_thread |Adding item: 1
2020-06-22 19:01:26.575|process_items_thread_002 |Processing item: 1
2020-06-22 19:01:26.680|process_items_thread_002 |Item: 1 processed.
2020-06-22 19:01:27.577|add_items_thread |Adding item: 2
2020-06-22 19:01:27.578|process_items_thread_004 |Processing item: 2
2020-06-22 19:01:27.679|process_items_thread_004 |Item: 2 processed.
2020-06-22 19:01:28.579|add_items_thread |Adding item: 3
2020-06-22 19:01:28.579|process_items_thread_000 |Processing item: 3
2020-06-22 19:01:28.680|process_items_thread_000 |Item: 3 processed.
2020-06-22 19:01:29.580|add_items_thread |Adding item: 4
2020-06-22 19:01:29.582|process_items_thread_004 |Processing item: 4
2020-06-22 19:01:29.685|process_items_thread_004 |Item: 4 processed.
2020-06-22 19:01:30.000|monitor_queue_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:01:30.583|add_items_thread |Adding item: 5
2020-06-22 19:01:30.583|process_items_thread_002 |Processing item: 5
2020-06-22 19:01:30.686|process_items_thread_002 |Item: 5 processed.
2020-06-22 19:01:31.584|add_items_thread |Adding item: 6
2020-06-22 19:01:31.584|process_items_thread_000 |Processing item: 6
2020-06-22 19:01:31.686|process_items_thread_000 |Item: 6 processed.
2020-06-22 19:01:32.586|add_items_thread |Adding item: 7
2020-06-22 19:01:32.586|process_items_thread_004 |Processing item: 7
2020-06-22 19:01:32.688|process_items_thread_004 |Item: 7 processed.
2020-06-22 19:01:33.587|add_items_thread |Adding item: 8
2020-06-22 19:01:33.588|process_items_thread_000 |Processing item: 8
2020-06-22 19:01:33.691|process_items_thread_000 |Item: 8 processed.
2020-06-22 19:01:34.590|add_items_thread |Adding item: 9
2020-06-22 19:01:34.590|process_items_thread_004 |Processing item: 9
2020-06-22 19:01:34.694|process_items_thread_004 |Item: 9 processed.
2020-06-22 19:01:35.003|monitor_queue_thread |Processing queue size: 0. Remaining tasks: 0
2020-06-22 19:01:35.594|add_items_thread |Exiting thread
2020-06-22 19:01:35.691|process_items_thread_002 |Exiting thread
2020-06-22 19:01:35.691|process_items_thread_000 |Exiting thread
2020-06-22 19:01:35.695|process_items_thread_004 |Exiting thread
2020-06-22 19:01:36.579|process_items_thread_001 |Exiting thread
2020-06-22 19:01:36.581|process_items_thread_003 |Exiting thread
2020-06-22 19:01:40.001|monitor_queue_thread |Exiting thread. Remaining tasks: 0
2020-06-22 19:01:40.004|MainThread |Exiting thread

Recap

I continued improving the program of the previous article. Now it stops properly when no more items are available. Multiple consumer threads are used. I also added some logging and cleaned up the program a little.

What Now?

In the next article, I will increase the number of producing threads and explain how to structure the program.

History

  • 21st August, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)