Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Robust C++: Queue Templates

0.00/5 (No votes)
23 Jun 2020GPL34 min read 8.9K   119  
Implementing corruptible queues that don't manage memory
This article describes one-way and two-way queue templates that can be used instead of STL queues. Objects in the queue provide the links, so memory is used efficiently. And if a queue gets trampled, the system can try to recover.

Introduction

If you're using the STL and need a queue, you'll probably opt for std::deque or std::list. For some systems, however, using these templates can be problematic:

  1. Their memory overhead can be significant in a system with many short queues.
  2. They allocate and manage memory, which can conflict with the need to provide predictable response times in a hard real-time system.
  3. They hide the queue data structure, which makes it difficult for a robust system to recover from a queue corruption caused by an unguarded critical region or trampled memory.

This article introduces templates that can be used when these issues make it undesirable to use an STL template.

Background

The templates in this article come from the Robust Services Core (RSC), an open-source framework for developing robust C++ applications. The templates are

  • Q1Way, for one-way queues, and
  • Q2Way, for two-way queues.

Because their implementations are similar, this article only describes Q1Way, which is used more frequently. Q2Way is only used for performance reasons, when elements must often be removed from the middle of a relatively long queue. Q2Way also supports backwards iteration, but applications seldom need that capability.

Using the Code

The code has been edited to remove things that are not central to the templates. These things appear in the full versions of the code in the attached .zip file, so you can remove them if you're not using RSC.

The crux of the design is that a queueable object defines a Q1Link member which provides the link to the next object in the queue. Because of this, the template does not need to allocate memory for elements on the queue.

Here is the code for Q1Link, except for its destructor.

C++
//  Link for an item on a one-way queue.  An object that resides on a one-way
//  queue includes this as a member and implements a LinkDiff function that
//  returns the distance between the top of the object and its Q1Link member.
//
class Q1Link
{
   template<class T> friend class Q1Way;
public:
   //  Public because an instance of this class is included in objects that
   //  can be queued.
   //
   Q1Link() : next(nullptr) { }

   //  Public because an instance of this class is included in objects that
   //  can be queued.
   //
   ~Q1Link();

   //  Deleted to prohibit copying.
   //
   Q1Link(const Q1Link& that) = delete;
   Q1Link& operator=(const Q1Link& that) = delete;

   //  Returns true if the item is on a queue.
   //
   bool IsQueued() const { return next != nullptr; }

   //  Returns a string for displaying the link.
   //
   std::string to_str() const
   {
      std::ostringstream stream;
      stream << next;
      return stream.str();
   }
private:
   //  The next item in the queue.  Because Q1Way uses circular queues, a
   //  value of nullptr means that the item is not on a queue.
   //
   Q1Link* next;
};

An object should be removed from a queue before it is deleted. But if it's still queued, the Q1Link destructor removes it to prevent a broken link from corrupting the queue and orphaning the objects that follow it:

C++
Q1Link::~Q1Link()
{
   //  If the item is still queued, exqueue it.  This is a serious problem if
   //  it is the tail item, because it will leave the queue head pointing to
   //  a deleted item.
   //
   if(next == nullptr) return;

   auto prev = this;
   auto curr = next;
   auto i = INT16_MAX;

   while((i >= 0) && (curr != nullptr))
   {
      if(curr == this)
      {
         auto before = INT16_MAX - i;

         if(before == 0)
         {
            Debug::SwLog("tail item was still queued", before);
            prev->next = nullptr;
         }
         else
         {
            Debug::SwLog("item was still queued", before);
            prev->next = curr->next;
         }
         return;
      }

      --i;
      prev = curr;
      curr = curr->next;
   }

   Debug::SwLog("item is still queued", INT16_MAX - i);
}

Q1Link prohibited copying, which can corrupt a queue by duplicating a link. Because a queueable object defines a Q1Link member, this also makes the object uncopyable. If it must support copying, Q1Link's copy constructor and copy operator could be implemented by invoking its destructor (to make sure the item isn't queued) and then initializing next to nullptr so that the item's new incarnation remains unqueued.

Now we can start to look at the Q1Way template itself. Note that it points to the queue's tail element, which makes both enqueue and dequeue operations efficient. The queued objects actually form a circular queue.

C++
//  One-way queue.  Recommended unless items are often exqueued, which
//  can be expensive.
//  o no items: tail_.next = nullptr
//  o one item: tail_.next = item, item.next = item (points to itself)
//  o two or more items: tail_.next = last item, last item.next = first
//    item, second last item.next = last item (circular queue)
//
template<class T> class Q1Way
{
public:
   //  Initializes the queue header to default values.  Before the queue
   //  can be used, Init must be invoked.
   //
   Q1Way() : tail_(nullptr), diff_(NilDiff) { }

   //  Cleans up the queue.
   //
   ~Q1Way()
   {
      if(tail_.next == nullptr) return;
      Purge();
   }

   //  Deleted to prohibit copying.
   //
   Q1Way(const Q1Way& that) = delete;
   Q1Way& operator=(const Q1Way& that) = delete;

   //  Initializes the queue so that it can be used.
   //
   void Init(ptrdiff_t diff)
   {
      tail_.next = nullptr;  // queue is empty
      diff_ = diff;          // distance from each item's vptr to its Q1Link
   }
private:
   //  For initializing diff_.
   //
   static const ptrdiff_t NilDiff = -1;

   //  The queue head, which actually points to the tail item.
   //  If the queue is empty, tail_.next is nullptr.
   //
   Q1Link tail_;

   //  The distance from an item's vptr to its Q1Link.
   //
   ptrdiff_t diff_;
};

In order to manipulate the queue links, Q1Way needs to know where each object's Q1Link is located. This is the purpose of diff_, which is the distance (in bytes) from the beginning of the object to its Q1Link member. All the objects in a queue must use the same offset, so a common base class is needed if different classes will have objects in the same queue.

The value of diff_ is returned by a grotty function that RSC classes always name LinkDiff. For example, RSC's Thread class defines a msgq_ member where messages for a thread are queued. These messages derive from MsgBuffer, which therefore needs a Q1Link member and a LinkDiff function that allows Thread to initialize msgq_. However, MsgBuffer derives from Pooled, which already provides a Q1Link member and LinkDiff function for all pooled objects. The outline (omitting other members) therefore looks like this:

C++
class Pooled : public Object
{
public:
   static ptrdiff_t LinkDiff() const;
private:
   Q1Link link_;
};

ptrdiff_t Pooled ::LinkDiff()
{
   uintptr_t local;
   auto fake = reinterpret_cast<const Pooled*>(&local);
   return ptrdiff(&fake->link_, fake);
}

class Thread : public Permanent
{
private:
   Q1Way<MsgBuffer> msgq_;
};

msgq_.Init(Pooled::LinkDiff());  // in Thread's constructor

Once Init has been invoked, the Q1Way function Item can find the location of each element's Q1Link. This function only needs to be used internally and is therefore private:

C++
Q1Link* Item(const T& elem) const
{
   if(diff_ == NilDiff) return nullptr;
   if(&elem == nullptr) return nullptr;
   return (Q1Link*) getptr2(&elem, diff_);  // adds diff_ bytes to &elem
}

An item is usually added to the back of the queue:

C++
bool Enq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;        // error
   if(item->next != nullptr) return false;  // if item is queued, do nothing
   if(tail_.next != nullptr)                // if queue isn't empty
   {                                        // then
      item->next = tail_.next->next;        // item points to first element
      tail_.next->next = item;              // last element points to item
   }
   else
   {
      item->next = item;                    // else item points to itself
   }
   tail_.next = item;                       // tail points to item
   return true;
}

An item can also be added to the front of the queue:

C++
bool Henq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;        // error
   if(item->next != nullptr) return false;  // if item is queued, do nothing
   if(tail_.next != nullptr)                // if queue isn't empty
   {
      item->next = tail_.next->next;        // item points to first element
      tail_.next->next = item;              // last element points to item
   }                                        // tail isn't changed, so item is
   else                                     // after last (and is thus first)
   {
      item->next = item;                    // item points to itself
      tail_.next = item;                    // tail points to item
   }
   return true;
}

An item can even be added to the middle of the queue:

C++
bool Insert(T* prev, T& elem)
{
   if(prev == nullptr)                        // if nothing is previous
      return Henq(elem);                      // then put item at head
   auto item = Item(elem);
   if(item == nullptr) return false;          // error
   auto ante = (Q1Link*)
      getptr2(prev, diff_);                   // put item after previous
   if(item->next != nullptr) return false;    // item must not be queued
   if(ante->next == nullptr) return false;    // prev must be queued
   item->next = ante->next;
   ante->next = item;
   if(tail_.next == ante) tail_.next = item;  // update tail if item is last
   return true;
}

An item is usually removed from the front of the queue:

C++
T* Deq()
{
   if(tail_.next == nullptr)          // if tail points to nothing
      return nullptr;                 // then queue is empty
   Q1Link* item = tail_.next->next;   // set item to first element
   if(tail_.next != item)             // if item wasn't alone
      tail_.next->next = item->next;  // then last now points to second
   else
      tail_.next = nullptr;           // else queue is now empty
   item->next = nullptr;              // item has been removed
   return (T*) getptr1(item, diff_);  // location of item's vptr (subtracts
                                      //   diff_ bytes from item)
}

If necessary, an item can be removed from anywhere on the queue:

C++
bool Exq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;  // error
   if(item->next == nullptr)          // if the item isn't queued
      return true;                    // then return immediately
   if(tail_.next == nullptr)          // if the queue is empty
      return false;                   // then the item can't be there
   if(item->next == item)             // if the item points to itself
   {
      if(tail_.next == item)          // and if the item is also last
      {
         item->next = nullptr;        // then remove the item
         tail_.next = nullptr;        // and the queue is now empty
         return true;
      }
      return false;                   // the item is on another queue
   }
   auto curr = tail_.next;            // starting at the last element,
   while(curr->next != item)          // advance until the item is
   {
      curr = curr->next;              // the next element--but
      if(curr == tail_.next)          // stop after searching the
         return false;                // entire queue
   }
   curr->next = item->next;           // curr's next becomes item's next
   if(tail_.next == item)             // if the item was the tail
      tail_.next = curr;              // then the tail has to back up
   item->next = nullptr;              // the item has been removed
   return true;
}

The functions First and Next support queue traversal. They simply use a pointer to the current item rather than a distinct iterator. Traversal usually takes the form

C++
for(auto item = queue_.First(); item != nullptr; queue_.Next(item))…
C++
T* First() const
{
   if(diff_ == NilDiff) return nullptr;  // queue is not initialized
   Q1Link* item = tail_.next;            // set item to first element
   if(item == nullptr) return nullptr;   // queue is empty
   item = item->next;                    // advance to head
   return (T*) getptr1(item, diff_);     // location of item's vptr
}

//  Updates ELEM to the next item in the queue.  If ELEM is nullptr,
//  provides the first item.  Returns true if there was a next item.
//
bool Next(T*& elem) const
{
   if(diff_ == NilDiff) return false;    // queue is not initialized
   Q1Link* item;                         // item will hold result
   if(elem == nullptr)                   // nullptr means
   {
      item = tail_.next;                 // start at last element
      if(item == nullptr) return false;  // return if the queue is empty
   }
   else
   {
      item = (Q1Link*)
         getptr2(elem, diff_);           // start at the current item
      if(tail_.next == item)             // if that is the last element
      {
         elem = nullptr;                 // then there are no more
         return false;
      }
   }
   item = item->next;                    // advance to next element
   if(item == nullptr)                   // make sure the item was queued
   {
      elem = nullptr;
      return false;
   }
   elem = (T*) getptr1(item, diff_);     // location of item's vptr
   return true;
}

//  Returns the item that follows ELEM.
//
T* Next(const T& elem) const
{
   auto item = Item(elem);
   if(item == nullptr) return nullptr;     // error
   if(tail_.next == item) return nullptr;  // return if item was last
   item = item->next;                      // advance to the next element
   if(item == nullptr) return nullptr;     // return if item wasn't queued
   return (T*) getptr1(item, diff_);       // location of next item's vptr
}

There are also the usual functions for finding the queue's length:

C++
bool Empty() const { return (tail_.next == nullptr); }

size_t Size() const
{
   if(diff_ == NilDiff) return 0;  // queue is not initialized
   Q1Link* item = tail_.next;      // start at the last item
   if(item == nullptr) return 0;   // check for an empty queue
   size_t count = 1;               // there is at least one element
   item = item->next;              // advance to the first element
   while(item != tail_.next)       // stop when we reach the tail
   {
      item = item->next;
      ++count;
   }
   return count;                   // report the result
}

And finally, there is the equivalent of clear:

C++
void Purge()
{
   while(tail_.next != nullptr)
   {
      auto item = Deq();
      delete item;
   }
}

If you want to modify the templates, NtIncrement.cpp contains Q1WayCommands and Q2WayCommands, which allow Q1Way and Q2Way functions to be tested via RSC's CLI.

Points of Interest

The following function is provided to test software that tries to recover after a queue corruption:

C++
//  Corrupts ELEM's next pointer for testing purposes.  If ELEM is
//  nullptr, the queue's tail pointer is corrupted instead.
//
void Corrupt(T* elem)
{
   if(elem == nullptr)                     // if no item provided
   {
      tail_.next = (Q1Link*) BAD_POINTER;  // corrupt queue header
      return;
   }
   auto item = (Q1Link*)                   // start at the current item
      getptr2(elem, diff_);
   item->next = (Q1Link*) BAD_POINTER;     // corrupt ELEM's next pointer
}

If the queue of available blocks in an object pool gets corrupted, how the object pool audit repairs it is described here.

History

  • 13th June, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The GNU General Public License (GPLv3)