Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Awaitable Concurrent Priority Queue

5.00/5 (3 votes)
29 Dec 2017CPOL5 min read 13.9K   990  
Optionally awaitable simple to use Concurrent Priority Queue.

Introduction

General Priority Queue is a seriously ungrateful and vast topic to cover. Those who know advanced data structures remember as first thing computational complexity of general implementations and those who do not, get mystified by the word Queue as if in the FIFO sense, yet they are more complex than FIFO.

To simplify things right away, it is not my intention to cover general Priority Queue, but rather an implementation with a limited number of distinct strongly typed priority levels. Restricting capabilities of general Priority Queue if carried down to the implementation comes with an interesting reduction in code complexity and more importantly, the time complexity for push/pop (enqueue/dequeue) operations.

The Priority Queue implementation here presented will be a good choice in cases like bandwidth management, command execution prioritization, and priority-based merging of events from distinct sources. Simply cases, where we know priority levels during development. The ability to thrive in the heavy concurrent environment of many threads is a cherry on the cake.

The implementation here presented won't be the choice for classic general Priority Queue applications, where priorities usually are not known during code development, like  Discrete event simulation, Dijkstra's algorithm, Huffman coding etc.

Requirements

  • Lock-Free,
  • Highly Concurrent,
  • generic in stored item type,
  • generic in priority type, but constrained to priorities represented by .net enum, strongly typed priority,
  • explicitly defined descending order of priorities during construction,
  • ability to detect items count and per priority level items count,
  • ability to dequeue - descending order of priorities,
  • ability to override dequeue priority level,
  • potentially awaitable,
  • potentially priority based awaitable,

Such requirements are capable to produce a restricted priority queue with the ability to control items counts per priority level, thus avoiding issues with overflow.

Implementation

The implementation will be divided into two classes, the first class PriorityQueueUC will cover concurrent priority queue, without awaitable functionality, the second class PriorityQueueNotifierUC will inherit the PriorityQueueUC class and extend the class to enable awaiting on enqueued data. The reason for moving the awaitable part into own class is performance wise. It adds extra synchronization layer which is in some cases unnecessary.

Both implementations require an enum representing priority:

C#
public enum QueuePriority
{
    Lower,
    Normal,
    Higher
}

Both implementations require explicit definition of order of priorities in descending order!

This has to be done explicitly because reflection does not guarantee the order of items to be reflected in the same order as is coded in the enum block.
We cannot depend upon values of enum elements as they can be overridden!

Constructors take the descending order of priorities as IEnumerable<TPriority> so we can use an array for example.

C#
QueuePriority[] descendingPriorities =
{
    QueuePriority.Higher,
    QueuePriority.Normal,
    QueuePriority.Lower
};

Both classes thanks to inheritance implement this interface:

C#
public interface IPriorityQueueUC<TPriority, TItem> where TPriority : struct
{
    void Enqueue(TPriority priority, TItem item);
    void Enqueue(TPriority priority, IList<TItem> items);
    void Enqueue(TPriority priority, IEnumerable<TItem> items);
    bool TryDequeu(out TItem item, TPriority? priority = null);
    bool TryDequeu(out TItem item, out TPriority? priority, TPriority? requestedPriority = null);
    int Count(TPriority? requestedPriority = null);
    bool HasItems(TPriority? requestedPriority = null);
}

Enqueue

The operation is straightforward, takes TPriority and item(s) as TItem, ILIst<TItem> or IEnumerable<TItem> to ensure efficient loading from different collections.

TryDequeu

bool TryDequeu(out TItem item, TPriority? priority = null);
Returning true if successful, the result is in out TItem and if second parameter TPriority? is null, descending order of priorities is used until the item is found or false.
If the second parameter is not null, then overridden priority is requested.

bool TryDequeu(out TItem item, out TPriority? priority, TPriority? requestedPriority = null);
Returning true if successful, item result out TItem, result out TPriority? and ability to override the descending order of priorities with requested priority TPriority.


Count

The Count operation is returning a number of items per complete collection or if TPriority? is overridden then count per specific priority.
Consuming this is subject to concurrently performed enqueues and dequeues, as they affect the value.
Please keep in mind, that Count and HasItems are very similar to what .Net ConcurrentQueue Count does.

HasItems
HasItems says whether true or false per collection or per requested priority.
Consuming this is subject to concurrently performed enqueues and dequeues, as they affect the value.
Please keep in mind, that Count and HasItems are very similar to what .Net ConcurrentQueue Count does.

Construction

Construction is simple with both versions of classes:

C#
public enum QueuePriority { Lower, Normal, Higher }

QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };

var pQueue = new PriorityQueueUC<QueuePriority , string>(descendingPriorities);
var pQueueNotifier = new PriorityQueueNotifierUC<QueuePriority, string>(descendingPriorities);

 

Awaitable Concurrent Priority Queue

Now the difference of the awaitable version is, that it is extending the original interface IPriorityQueueUC<TPriority, TItem>:

C#
public interface IPriorityQueueNotifierUC<TPriority, TItem> : IPriorityQueueUC<TPriority, TItem>
where TPriority: struct
{
       ICompletionUC EnqueuedItemsAsync();
       ICompletionUC EnqueuedItemsAsync(TPriority priority);
}

The returned data type of EnqueuedItemsAsync is ICompletionUC, an interface-based .Net awaitable.
ICompletionUC allows to await on items to be present in the priority queue in descending order of priorities or await on items of specific priority.

The result of the EnqueuedItemsAsync operation is information, that there could be some item(s) in the priority queue of descending priorities or specific priority.
The dequeue operation is not performed nor any other internal state is changed by the call!
Next step after awaiting is TryDequeue method!

EnqueuedItemsAsync() is returning a completed ICompletionUC awaitable if there are already any items in the priority queue.
That makes awaiting completing right away and synchronously executing code after await call!
Exactly as in the case:

C#
await Task.CompletedTask;

EnqueuedItemsAsync(TPriority) is returning a completed ICompletionUC awaitable if there are already any items in the priority queue for requested TPriority.
That makes awaiting completing right away and synchronously executing code after await call!
Exactly as in the case:

C#
await Task.CompletedTask;

EnqueuedItemsAsync is not affecting in any way processing of items in the priority queue!

 

Using the PriorityQueueUC

In this case, it is construction and then from same or different threads calling Enqueue and TryDequeue and potentially Count and HasItems:

 

C#
public enum QueuePriority { Lower, Normal, Higher }
QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };
var pQueue = new PriorityQueueUC<QueuePriority, string>(descendingPriorities);
C#
pQueue.Enqueue(QueuePriority.Higher, "An item with higher priority");

 

C#
string dequeued;
if (pQueue.TryDequeu(out dequeued)) Console.WriteLine($"Dequeued: {dequeued}");
C#
string dequeued;
QueuePriority? priority;
if (pQueue.TryDequeu(out dequeued, out priority)) Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
C#
string dequeued;
QueuePriority? priority;
if (pQueue.TryDequeu(out dequeued, out priority, QueuePriority.Lower)) Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");

 

Using the PriorityQueueNotifierUC

Construction is almost same:

C#
public enum QueuePriority { Lower, Normal, Higher }
QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };
var pQueueNotifier = new PriorityQueueNotifierUC<QueuePriority, string>(descendingPriorities);

Enqueue:

C#
pQueueNotifier.Enqueue(QueuePriority.Higher, "An item with higher priority");

 

C#
Task.Run(() => DescendingPriorityConsumer(pQueueNotifier));

private async Task DescendingPriorityConsumer(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
       string dequeued;
       await pQueueNotifier.EnqueuedItemsAsync();
       if (!pQueueNotifier.TryDequeu(out dequeued)) continue;
       Console.WriteLine($"Dequeued: {dequeued}");
    }
}

 

C#
Task.Run(() => DescendingPriorityConsumer2(pQueueNotifier));

private async Task DescendingPriorityConsumer2(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
        string dequeued;
        QueuePriority? priority;
        await pQueueNotifier.EnqueuedItemsAsync();
        if (!pQueueNotifier.TryDequeu(out dequeued, out priority)) continue;
        Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
    }
}

 

C#
Task.Run(() => OverriddenPriorityConsumer(pQueueNotifier));

private async Task OverriddenPriorityConsumer(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
        string dequeued;
        QueuePriority? priority;
        // priority of EnqueuedItemsAsync and TryDequeue must match exactly oherwise we could burn CPU with bussy loop!
        await pQueueNotifier.EnqueuedItemsAsync(QueuePriority.Lower);
        if (!pQueueNotifier.TryDequeu(out dequeued, out priority, QueuePriority.Lower)) continue;
        Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
   }
}

 

 

Examples and GreenSuperGreen library

This article is written based on my library, GreenSuperGreen with sources available on the github and .Net 4.6 package on the nuget.
The source code presented here for download as part of this article is targetting the nuget package.

The GreenSuperGreen library has more interesting classes to explore than restricted priority queues...

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)