Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

TPL: Producer Consumer Pattern - Thread Safe Queue Collection

4.50/5 (3 votes)
24 Jul 2016CPOL4 min read 33.4K   593  
Solving Producer Consumer Problem - Thread Safe Data Access for Shared Data

Introduction

The Purpose of this article is to demonstrate usage of Thread Safe Queue Collection using Concurrent Queue in the Producer and Consumer scenario.


The Producer is responsible for sending the messages typically via queue and Consumer is responsible for processing these signals and removing from queue i.e. one item at one time.

This article primarily focuses on Queue and Concurrent Queue as a mechanism where Producer can send messages and consumer can process those messages.

In my next article, I will demonstrate how to resolve typical Producer and Consumer Problem. Here we will focus on Thread Safe data access for Producer and Consumer.

Background

The Producer will generate series of tasks, and each task is having some data and the associated action; similarly, the consumer will receive those tasks and process tasks as per the work associated with this.

Let’s assume that here Task is to reverse the randomly generated string and we have two actors, Producer, who will create several random strings and Consumer, which will consume data and reverse the string.

Since there are multiple ways provided by .Net framework we can use one of below

  • Queue
  • ConcurrentQueue
  • BlockingCollection

Queue is typical FIFO collection which has Enqueue, Dequeue and Peek methods.

  • Enqueue will add new item to end of Queue collection
  • Dequeue will remove and return the object at beginning of Queue collection
  • Peek will return the object at beginning of Queue without removing from Queue collection

ConcurrentQueue is again FIFO structure with Enqueue, TryDequeue and TryPeek methods

  • Enqueue will add new item to end of ConcurrentQueue
  • TryDequeue will try to remove and return object at the beginning of ConcurrentQueue
  • TryPeek will try to return object at the beginning of ConcurrentQueue

 

So what is more special about ConcurrentQueue? Yes, this is a Thread Safe Collection.

Problem: Do we need Thread Safe Share Data Access? Answer is YES

In Producer Consumer scenario collection of message are processed by Consumer; here we can have one or more Consumer (Thread), and both can operate at the same time in parallel. That means there is a possibility that one of the Consumer will try to access message which is already processed or removed from the collection of messages.

The Queue is FIFO collection and does not provide the built-in implementation for Thread Safe data access. Yes with the help of Lock we can achieve this, but this is built in feature of ConcurrentQueue.

See below consumer thread is trying to process the task which is already handled by another thread and removed from the Queue since data accessed by one or more Consumer (Thread) is not safe Consumer is failed to Dequeue task even if task was present in Queue when Consumer entered into while loop

 

Since Concurrent Queue is a lock-free implementation, it helps to have the Thread Safe data access. Please see below code snippets on how to use the Concurrent Queue in The Producer-Consumer Scenario.

About Code

In attached VS 2015 solution we have

  • QueuedObject.cs
    • This class will hold some data like ConsumerThreadID, ProducerThreadID,  RandomString, etc. and will be Queued by the Producer (Either Queue collection or Concurrent Queue collection).
  • Program.cs has
    • Producer thread is generating tasks and adding those to the selected queue with an associated action.
    • Two Consumer threads will read the selected queue, extract task and perform associated action i.e. reverse the string.
  • QueueService.cs has
    • Implementation of .Net Queue collection which enqueues and dequeues the tasks.
  • ConcurrentQueue.cs has
    •  Implementation of .Net Concurrent Queue collection which enqueues and dequeues the tasks.

Please make sure to commment and uncomment the Queue and Concurrent Queue call code lines along with the ConsumeTasks method. For example if you would like to demonstrate the Queue - Access to shared data problem then in Program.cs comment calls related to ConcurrentQueue in ProduceTasks and ConsumeTasks method.

QueuedObject.cs:

C#
public class QueuedObject
   {
       public int QueueID { get; set; }
       public int ConsumerThreadID { get; set; }
       public int ProducerThreadID { get; set; }
       public  string  RandomString { get; set; }
       public DateTime EnqueueDateTime { get; set; }
       public DateTime DequeueDateTime { get; set; }
   }

QueueService.cs

C#
public static class QueueService
    {
        static Queue<Task> _queue;

        static QueueService() { _queue = new Queue<Task>(); }
        public static void Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
        {
            Task task = new Task(action, cancelToken);
            _queue.Enqueue(task);
        }

        public static void Dequeue()
        {
            while (true)
            {
                try
                {
                    Task task = _queue.Dequeue();
                    task.RunSynchronously();
                }
                catch (NullReferenceException ex)
                {
                    Debug.WriteLine(ex.Message);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }

    }

ConcurrentQueueService.cs

C#
class ConcurrentQueueService
    {
        static ConcurrentQueue<Task> _queue;

        static ConcurrentQueueService()
        {            
            _queue = new ConcurrentQueue<Task>();
        }

        public static void Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
        {
            Task task = new Task(action, cancelToken);
            _queue.Enqueue(task);
        }

        public static void Dequeue()
        {
            while (true)
            {
                try
                {
                    Task task;
                    if (_queue.TryDequeue(out task)) { task.RunSynchronously(); }
                }
                catch (NullReferenceException ex)
                {
                    string w = ex.Message;
                    Debug.WriteLine(ex.Message);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
    }

Program.cs

C#
class Program
{
    static void Main(string[] args)
    {
        var t1 = Task.Factory.StartNew(() => ProduceTasks());
        var t2 = Task.Factory.StartNew(() => ConsumeTasks());
        var t3 = Task.Factory.StartNew(() => ConsumeTasks());
        Task.WaitAll(t1, t2, t3);
        Console.ReadLine();
    }

    public static void ProcessQueue(QueuedObject queue)
    {
        string reversedString = new string(Enumerable.Range(1, queue.RandomString.Length).Select(i => queue.RandomString[queue.RandomString.Length - i]).ToArray());

        Console.WriteLine
            (
            "Dequeued: " + queue.QueueID +
            "\t" + "Consumer ThreadID :" + Thread.CurrentThread.ManagedThreadId +
            "\t" + DateTime.Now.ToLongTimeString() +
            "\t" + "ReversedString :" + reversedString
            );
    }

    public static void ProduceTasks()
    {
        Random random = new Random();
        for (int i = 1; i <= 100000; i++)
        {
            var queue = new QueuedObject
            {
                QueueID = i,
                ProducerThreadID = Thread.CurrentThread.ManagedThreadId,
                EnqueueDateTime = DateTime.Now,
                // Used to Generate Random String
                RandomString = new string(Enumerable.Repeat("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 5).Select(s => s[random.Next(s.Length)]).ToArray())
            };

            #region Queueing using Queue Collection
            // Uncomment QueueService.Enqueue(() => { ProcessQueue(queue); }) to use .Net Queue Object to Queue Tasks.
            // And Comment ConcurrentQueueService.Enqueue
            // QueueService.Enqueue(() => { ProcessQueue(queue); });
            #endregion

            #region Queueing using ConcurrentQueue
            ConcurrentQueueService.Enqueue(() => { ProcessQueue(queue); });
            #endregion

            Console.WriteLine
                (
                "Enqueued: " + queue.QueueID +
                "\t" + "Producer ThreadID :" + queue.ProducerThreadID +
                "\t" + queue.EnqueueDateTime.ToLongTimeString() +
                "\t" + "RandomString   :" + queue.RandomString
                );
        }
    }

    public static void ConsumeTasks()
    {
        //QueueService.Dequeue();
        ConcurrentQueueService.Dequeue();
    }
}

 

Attached code demonstrates Queue as well as Concurrent Queue collection implementation in Producer Consumer Scenario; here one Producer is used to generate the tasks, and two consumers are used to process the tasks received in queue collection.

Point of Interest: Some More Producer Consumer Problems (I will address this in next version of Article)

  • Producer should discard adding data or go to sleep mode when Queue is full when consumer removes any data it should raise notification to Producer so that it should start adding data again.

  • Consumer should go to sleep mode when there is no data available to process and Consumer should wake up when data is available.

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)