This article describes one-way and two-way queue templates that can be used instead of STL queues. Objects in the queue provide the links, so memory is used efficiently. And if a queue gets trampled, the system can try to recover.
Introduction
If you're using the STL and need a queue, you'll probably opt for std::deque
or std::list
. For some systems, however, using these templates can be problematic:
- Their memory overhead can be significant in a system with many short queues.
- They allocate and manage memory, which can conflict with the need to provide predictable response times in a hard real-time system.
- They hide the queue data structure, which makes it difficult for a robust system to recover from a queue corruption caused by an unguarded critical region or trampled memory.
This article introduces templates that can be used when these issues make it undesirable to use an STL template.
Background
The templates in this article come from the Robust Services Core (RSC), an open-source framework for developing robust C++ applications. The templates are
Q1Way
, for one-way queues, and Q2Way
, for two-way queues.
Because their implementations are similar, this article only describes Q1Way
, which is used more frequently. Q2Way
is only used for performance reasons, when elements must often be removed from the middle of a relatively long queue. Q2Way
also supports backwards iteration, but applications seldom need that capability.
Using the Code
The code has been edited to remove things that are not central to the templates. These things appear in the full versions of the code in the attached .zip file, so you can remove them if you're not using RSC.
The crux of the design is that a queueable object defines a Q1Link
member which provides the link to the next object in the queue. Because of this, the template does not need to allocate memory for elements on the queue.
Here is the code for Q1Link
, except for its destructor.
class Q1Link
{
template<class T> friend class Q1Way;
public:
Q1Link() : next(nullptr) { }
~Q1Link();
Q1Link(const Q1Link& that) = delete;
Q1Link& operator=(const Q1Link& that) = delete;
bool IsQueued() const { return next != nullptr; }
std::string to_str() const
{
std::ostringstream stream;
stream << next;
return stream.str();
}
private:
Q1Link* next;
};
An object should be removed from a queue before it is deleted. But if it's still queued, the Q1Link
destructor removes it to prevent a broken link from corrupting the queue and orphaning the objects that follow it:
Q1Link::~Q1Link()
{
if(next == nullptr) return;
auto prev = this;
auto curr = next;
auto i = INT16_MAX;
while((i >= 0) && (curr != nullptr))
{
if(curr == this)
{
auto before = INT16_MAX - i;
if(before == 0)
{
Debug::SwLog("tail item was still queued", before);
prev->next = nullptr;
}
else
{
Debug::SwLog("item was still queued", before);
prev->next = curr->next;
}
return;
}
--i;
prev = curr;
curr = curr->next;
}
Debug::SwLog("item is still queued", INT16_MAX - i);
}
Q1Link
prohibited copying, which can corrupt a queue by duplicating a link. Because a queueable object defines a Q1Link
member, this also makes the object uncopyable. If it must support copying, Q1Link
's copy constructor and copy operator could be implemented by invoking its destructor (to make sure the item isn't queued) and then initializing next
to nullptr
so that the item's new incarnation remains unqueued.
Now we can start to look at the Q1Way
template itself. Note that it points to the queue's tail element, which makes both enqueue and dequeue operations efficient. The queued objects actually form a circular queue.
template<class T> class Q1Way
{
public:
Q1Way() : tail_(nullptr), diff_(NilDiff) { }
~Q1Way()
{
if(tail_.next == nullptr) return;
Purge();
}
Q1Way(const Q1Way& that) = delete;
Q1Way& operator=(const Q1Way& that) = delete;
void Init(ptrdiff_t diff)
{
tail_.next = nullptr; diff_ = diff; }
private:
static const ptrdiff_t NilDiff = -1;
Q1Link tail_;
ptrdiff_t diff_;
};
In order to manipulate the queue links, Q1Way
needs to know where each object's Q1Link
is located. This is the purpose of diff_
, which is the distance (in bytes) from the beginning of the object to its Q1Link
member. All the objects in a queue must use the same offset, so a common base class is needed if different classes will have objects in the same queue.
The value of diff_
is returned by a grotty function that RSC classes always name LinkDiff
. For example, RSC's Thread
class defines a msgq_
member where messages for a thread are queued. These messages derive from MsgBuffer
, which therefore needs a Q1Link
member and a LinkDiff
function that allows Thread
to initialize msgq_
. However, MsgBuffer
derives from Pooled
, which already provides a Q1Link
member and LinkDiff
function for all pooled objects. The outline (omitting other members) therefore looks like this:
class Pooled : public Object
{
public:
static ptrdiff_t LinkDiff() const;
private:
Q1Link link_;
};
ptrdiff_t Pooled ::LinkDiff()
{
uintptr_t local;
auto fake = reinterpret_cast<const Pooled*>(&local);
return ptrdiff(&fake->link_, fake);
}
class Thread : public Permanent
{
private:
Q1Way<MsgBuffer> msgq_;
};
msgq_.Init(Pooled::LinkDiff());
Once Init
has been invoked, the Q1Way
function Item
can find the location of each element's Q1Link
. This function only needs to be used internally and is therefore private
:
Q1Link* Item(const T& elem) const
{
if(diff_ == NilDiff) return nullptr;
if(&elem == nullptr) return nullptr;
return (Q1Link*) getptr2(&elem, diff_); }
An item is usually added to the back of the queue:
bool Enq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; if(item->next != nullptr) return false; if(tail_.next != nullptr) { item->next = tail_.next->next; tail_.next->next = item; }
else
{
item->next = item; }
tail_.next = item; return true;
}
An item can also be added to the front of the queue:
bool Henq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; if(item->next != nullptr) return false; if(tail_.next != nullptr) {
item->next = tail_.next->next; tail_.next->next = item; } else {
item->next = item; tail_.next = item; }
return true;
}
An item can even be added to the middle of the queue:
bool Insert(T* prev, T& elem)
{
if(prev == nullptr) return Henq(elem); auto item = Item(elem);
if(item == nullptr) return false; auto ante = (Q1Link*)
getptr2(prev, diff_); if(item->next != nullptr) return false; if(ante->next == nullptr) return false; item->next = ante->next;
ante->next = item;
if(tail_.next == ante) tail_.next = item; return true;
}
An item is usually removed from the front of the queue:
T* Deq()
{
if(tail_.next == nullptr) return nullptr; Q1Link* item = tail_.next->next; if(tail_.next != item) tail_.next->next = item->next; else
tail_.next = nullptr; item->next = nullptr; return (T*) getptr1(item, diff_); }
If necessary, an item can be removed from anywhere on the queue:
bool Exq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; if(item->next == nullptr) return true; if(tail_.next == nullptr) return false; if(item->next == item) {
if(tail_.next == item) {
item->next = nullptr; tail_.next = nullptr; return true;
}
return false; }
auto curr = tail_.next; while(curr->next != item) {
curr = curr->next; if(curr == tail_.next) return false; }
curr->next = item->next; if(tail_.next == item) tail_.next = curr; item->next = nullptr; return true;
}
The functions First
and Next
support queue traversal. They simply use a pointer to the current item rather than a distinct iterator. Traversal usually takes the form
for(auto item = queue_.First(); item != nullptr; queue_.Next(item))…
T* First() const
{
if(diff_ == NilDiff) return nullptr; Q1Link* item = tail_.next; if(item == nullptr) return nullptr; item = item->next; return (T*) getptr1(item, diff_); }
bool Next(T*& elem) const
{
if(diff_ == NilDiff) return false; Q1Link* item; if(elem == nullptr) {
item = tail_.next; if(item == nullptr) return false; }
else
{
item = (Q1Link*)
getptr2(elem, diff_); if(tail_.next == item) {
elem = nullptr; return false;
}
}
item = item->next; if(item == nullptr) {
elem = nullptr;
return false;
}
elem = (T*) getptr1(item, diff_); return true;
}
T* Next(const T& elem) const
{
auto item = Item(elem);
if(item == nullptr) return nullptr; if(tail_.next == item) return nullptr; item = item->next; if(item == nullptr) return nullptr; return (T*) getptr1(item, diff_); }
There are also the usual functions for finding the queue's length:
bool Empty() const { return (tail_.next == nullptr); }
size_t Size() const
{
if(diff_ == NilDiff) return 0; Q1Link* item = tail_.next; if(item == nullptr) return 0; size_t count = 1; item = item->next; while(item != tail_.next) {
item = item->next;
++count;
}
return count; }
And finally, there is the equivalent of clear
:
void Purge()
{
while(tail_.next != nullptr)
{
auto item = Deq();
delete item;
}
}
If you want to modify the templates, NtIncrement.cpp contains Q1WayCommands
and Q2WayCommands
, which allow Q1Way
and Q2Way
functions to be tested via RSC's CLI.
Points of Interest
The following function is provided to test software that tries to recover after a queue corruption:
void Corrupt(T* elem)
{
if(elem == nullptr) {
tail_.next = (Q1Link*) BAD_POINTER; return;
}
auto item = (Q1Link*) getptr2(elem, diff_);
item->next = (Q1Link*) BAD_POINTER; }
If the queue of available blocks in an object pool gets corrupted, how the object pool audit repairs it is described here.
History
- 13th June, 2020: Initial version