Preface
Hello World
#include "CommandQueue.hpp"
CommandQueue commandQ;
void helloWorld() { printf( "Hello World" ); }
int main()
{
commandQ( helloWorld );
commandQ( [] { printf( "Hello World" ); } );
commandQ( [] { printf( "Hello " ); } )( [] { printf( "World" ); } );
commandQ.join();
return 0;
}
return value from thread
FILE* f;
commandQ.returns( fopen, &f, "image.png", "r" );
commandQ.join();
fclose( f );
Origins of the code
This code was originally designed as part of a high performance rendering engine, where draw commands (Command Queue) are executed on a dedicated rendering thread. Issuing rendering commands needed to be extremely optimized; where high performance and FPS (frames-per-second) is everything!
It had to be flexible enough to issue any kind of graphics call; from loading and drawing models, textures and shaders, to making state changes and handling events like viewport changes etc. allowing me to queue multiple function calls for rendering in the background while other code executes in parallel, like collision detection. The design eventually became so useful and flexible that I decided to use it for all inter-thread communication, between general purpose event/message queues, networking, gui input (mouse/keyboard events), gameplay logic, ai, audio streaming etc.
While doing some unrelated research, I noticed other people using std::queue/vector
, ring buffers and even linked lists to do the same thing (to execute functions from a thread); and while all these techniques work, they are very slow in comparison! I decided to make my code generally available because I think it might be useful to someone.
Background
I believe what you will find here, is possibly the easiest to use, most flexible, powerful and fastest thread communication code available, executing up to 10 million function calls per second, on a single Core i5 thread. This is my presentation to you, but what you will see here doesn't even scratch the surface of the full potential of this code! It's limited only by your immagination!
I never thought I would share this code with anyone, because I love the elegance and simplicity of its design so much; its the result of 25 years experience, over 10 years of that experience writing different multi-threaded implementations; from events and messages, to high performance rendering and network queues; with constant improvements and refinements made to the design, flexbility, speed and ease of use, over many years.
I want to believe that at least 50% of the people considering multi-threading want to do it for performance reasons, and I'll be happy if only 1% of you are willing to read and consider my implementation! There are so few articles written these days on lower level techniques like this, that I felt I might be able to contribute something meaningful to anyone willing to understand and appreciate this implementation.
But please, don't read this article with the mindset to judge my method or my code. First try to fully understand it, why have a done it this way, what problem am I trying to solve; then you might come to appreciate and love the simplicity, the potential and the raw speed! Be patient and take the time to read carefully! If you don't understand it the first time, then read it again, there are gems hidden between the lines! To fully understand and appreciate the full potential of this code, you need to be reading and thinking with an open mind! As you read my code and technique, you should be thinking "how can I apply this concept to my project, my library, my application or my game". And before you flame my use of function pointers, raw pointer aritmetic and unstructured buffers, consider that this technique might be useful to someone else.
This is about the evolution of an idea, a concept, a methodology, an alternative way for you to think about, and implement thread communication. Allowing threads to communicate in a more expressive, unrestricted and powerful way; with the added bonus of doing it at the speed of light! It's about writing code in a way that will unlock the untapped raw potential, power and throughput of multi-threading already at your finger tips.
As far as I'm aware, this is the only article discussing this technique in relation to multi-threading. But I'm not really presenting anything new or revolutionary, especially to low-level programmers, because everything here has already been done before! What I want to do is get people thinking again, to expand their thoughts and ideas on how you go about solving the problem of thread communication. I want to show you how you can package all the knowledge you already have; on function pointers, jump-tables, templates, pointers-to-pointers, pointer dereferencing, single/double/multi/ring buffers, thread syncronization, lock-free threading, stacks and queues; and combine all that knowledge in creating your own, powerful, flexible, high performance, simple and easy to use thread communication protocol!
However, nothing about this method was taken from another article written on thread communication. I'm actually very sad when I see the various techniques used by other programmers releasing their designs on this subject. In fact, I haven't found a single article or method anywhere in the world, which I would even remotely consider using myself. So not finding a suitably written article on this subject, prompted me to reconsider releasing my code.
So sit back, relax, and let me take you on a magic carpet ride into the depths and magic of my command queue!
This article took me over a week to write (100+ hours). I'm not a writer, I just hope you appreciate the time spent!
Target audience
You are looking for a fast, flexible and easy way to execute multiple functions on a light-weight worker thread.
You want to execute functions with return values on a worker thread; but you would never use `futures` and `promises`! Don't worry, I didn't use these abominations either! Your sanity is safe with me!
You have some interest in lower-level C/C++ programming techniques, or you can appreciate them!
This article is primarily targeted at C/C++ and possibly Assembler programmers, but I guess any language that accepts a function pointer as a valid data type can use this technique.
This article is not about how inter-thread communication works; I assume you already know how to send messages to threads; and I also assume you have already successfully implemented one or more working methods before. If you have never implemented multi-threading before, then this article is probably not for you!
My personal coding philosophy
I would like to say something about my personal coding style before we begin, so you understand my position!
Agreement or disagreement on any of these is not a requirement, just as long as you accept that it's my rite!
I come from a game engine, rendering, networking background where code performance should be appreciated.
I love beautiful code. Not only the structure of the code, the layout and spacing; but also the beauty in its design!
I'm an advocate of using Orthodox C++, Sane C++ and Nominal C++, or most anti-modern C++ styles!
I do use templates
and operator
overloading in moderation, especially when they have no runtime impact.
I like using nullptr
! auto
can be nice. I would never, ever use std::
shared_ptr
or std::unique_ptr
I very rarely use virtual
functions. But there's nothing stopping you from building a powerful and flexible command queue with virtual
functions!
I never use RTTI, Exceptions or Boost! I hate them all! Don't ask me why! Just deal with it and move on!
I personally never use STL containers; std::string
/queue
/array
/vector
etc. Too slow! It's just personal!
I wouldn't normally use std::atomic
, std::mutex
, std::thread
, but I made an exception for this code! I don't really mind them, I just prefer to make direct API calls, it's what I'm used to! And these are slower! Yes they are!
I never use static_cast
/dynamic_cast
/reinterpret_cast
! Too ugly! Use them if you want, but I don't!
I'm an advanced Assembler programmer; and advanced Intel intrinsics developer (I'm fluent in SSE/AVX intrinsics)!
I only develop on Intel/AMD 32-bit (x86) / 64-bit (x64)! I just never had the need to develop on anything else!
I normally like to develop code as low-level as I can, the lower the better! But within reason!
I also have layers of abstraction, but I normally build them on top of the lowest level I can, and with good reason!
I normally prefer using direct kernel API calls, but I prefer using malloc
/free
instead of HeapAlloc
!
When I have a job to do, using something I detest, I first consider the implications of it's use and might protest, with valid reasons. If my protests fall on deaf ears, I just quietly use it, but at least I tried! I can always program at home the way I want!
`If we never re-invented the wheel, wouldn't we still be driving horse drawn carriages?` - me
If you love writing high performance code, then we share the same passion!
Dedication
This code and technique stands on the shoulders of giants!
I dedicate this article and technique to John Carmack, Mark J. Kilgard, Michael Abrash, Bob Jenkins and Agner Fog, you are all legends in my eyes! Will we ever see your kind again?
Introduction
This article is about the function call protocol and double-buffer queues used in the Command Queue
The Command Queue is an easy to use, light-weight, high performance, low overhead, low latency, header-only C++11 class. On instantiation, the object creates a lock-free double-buffered queue and spawns a single thread (the consumer thread); the thread and buffers stay active for the lifetime of the object.
The term `command queue`, is used to refer to the custom designed protocol used internally for both storing and executing function calls. The double-buffers are used to store both function pointers and the input values required by the functions (commands), one buffer is used by the consumer thread (internal thread) to execute commands, while the other buffer is available to producer threads (your thread) for new commands. After the internal consumer thread has finished executing all commands (function calls) on the internal buffer, it resets and rotates buffers with the producer threads; giving them a new/clean/reset buffer while it processes any commands on the other buffer. When no more commands are available, the internal consumer thread enters a wait-state (sleeps) until new commands are added.
The Command Queue is designed for maximum throughput of function calls, without particular regard for memory consumption. By throughput and low latency, I refer to both the time it takes when a new command is added to the queue and executed, and/or the time taken between executing consecutive commands on a queue, which are separate scenarios, both are designed for maximum throughput, by low level optimizations.
If insufficient space is available for new commands from a producer thread, the buffer is expanded dynamically. During a typical real world scenario, the buffers shouldn't exceeds more than a few hundred bytes or KB, due to the fast execution and rotation of the buffers. However during a sustained synthetic stress test of 1 trillion function calls (10 million function calls per second vs. std::thread's 200,000), the memory consumption reached 4MB x2 RAM (8MB). Memory consumption will largely depend on how many functions are added/called and how long the function calls take to complete. Memory usage is typically 20-bytes per-function-call, which is normally freed (reused) immediately after the function is called, by buffer rotation.
Note: The code and examples are made as easy and readeable possible! I know it can be difficult to read and follow other peoples code, it sometimes takes a while to understand the concept as a whole. Some of the concepts discussed here might be `advanced` (depends entirely on your skillset, I don't know who you are or what you know), but I want to break it down and keep things as simple and easy to understand as I can! The important aspect is the overall concept, not actual code / implementation! You can implement this any way you want!
Pros and Cons
Pros:
- Ability to return values directly from a function executed on another thread! Without futures & promises!
- low overhead: 8-bytes (x86) / 12-bytes (x64) + data per command, +8-bytes extra for
execute()
stubs - low latency inner loop: 6 instructions to execute every command in the queue, this represents the `latency` between consecutive function calls, from beginning to end:
lea,call,mov,add,cmp,jb
- zero execution overhead: the functions have the opportunity to generate no additional call overhead! For example;
printf("Hello World")
compiled with VS2015 64-bit Release, with stack frames disabled generated 2 instructions in total inside the callback: mov
, jmp
. This also depends entirely on your functions! - fast allocations to consecutive memory without gaps using an internal allocator on general purpose self-managed buffers. Calls to `
alloc
` just reserve space on the pre-allocated buffer, only realloc()
is used when necessary to expand a buffer. The buffers are just reused and rotated - All input data required for a function call, including the function pointer are stored consecutively, followed directly by the next command.
- no memory fragmentation, no linked lists or separate allocations, no
free()
statements, no gaps, no temporary memory allocations - scaleable: can execute millions of functions per second from multiple producers with low memory footprint
- no traditional push/pop on the queue, `pop` is essentially just the `
add
` instruction in the inner loop - header-only: the compiler has all the information it needs to make all the optimizations it wants to! On a Release build, VS2015 inlined everything I hoped it would, even with default inlining! The templates were all inlined and optimized generating very acceptable code to me
- Compiler warnings of the templates will give you an indication of most (or all) errors, but they are cryptic!
- efficient sleep: the thread goes into an efficient wait-state (sleep) when no commands are present, and activated immediately when new commands are added to a queue
- no
switch-case
statements, enums
, function type or jump-table lookups etc. - synthetic benchmark: 10,000,000 function calls per second, in comparison to
std::thread
's 200,000
Cons:
- The allocations on the buffers are not aligned, they are packed. The unaligned memory might affect some older architectures. Aligning all the memory would require more clock cycles and more memory.
- Does not support functions with an unlimited number of input values, currently supports functions with up to 6 input parameters. Support for calling functions with more than 6 parameters can be added!
- Some compiler error messages are very cryptic, it usually looks like the Command Queue is broken, but it's actually just mismatched number of function input values or data types, it's not usually immediately apparent where the error is!
Use examples
Please download the files! It's too much to put here! There are some comments in the files!
files in the download
examples.cpp - general examples
returns.cpp - several examples of return values
benchmark.cpp - synthetic benchmark between Command Queue and std::thread
- not fair because I don't keep spawning new threads! Just an example of the cost of constantly spawning vs. reusing a thread, it boils down to Command Queue being able to make 500 function calls in the same time it takes for std::thread
to spawn!
Lambdas
Getting lambdas working can be a pain. Firstly, lambdas cannot be used with templates, because I need a `function pointer address` from the lambda, and they don't know how to `cast` themselves to a function pointer!
commandQ( [] { printf( "Hello World" ); } );
This is fine, because I don't use any templates to handle a void(*)()
function!
void execute( void (*function)() )
But I cannot do this for any other function! I need the templates, they are more important to me than lambdas
Casting lambdas
method 1
commandQ.returns( [] { return "Hello World"; }, &hw );
commandQ.returns( (const char*(*)()) [] { return "Hello World"; }, &hw );
I know right! It's because lamdas CAN be automatically cast, but NOT when you use templates! Because they don't know what type of function to cast to! Nothing I can about this, unless I make functions that use fixed functions.
method 2 (same thing as above!)
const char* hw = "";
const char*( *test )( ) = [] { return "Hello World"; };
commandQ.returns( test, &hw );
commandQ.join();
printf( hw );
Some crazy sh*t! However, this method is a bit easier, because the compiler can help you a bit! In the method above this (method 1), the compiler giving messages about my templates, which confused the crap out of me. It looked like my templates were broken ... the problem was the `const
`, yes even in the return value, I know right!
Performance & memory usage
Obviously there are many levels and aspects to this subject. So I will try to break it into the various components:
Total memory use: 368-bytes (excluding the double-buffers)
On Windows 7, Visual Studio 2015 64-bit
std::thread
- 16-bytes x1
std::condition_variable
= 72-bytes x2 (144-bytes)
std::mutex
= 80 bytes x2 (160-bytes)
std::atomic
= 8-bytes x2 (one atomic for each buffer)
16-bytes per buffer (8-byte address pointer, 4-bytes total size, 4-bytes used size) (32-bytes for double buffers)
bool
`shutdown` variable
Synthetic Benchmark
During synthetic tests, comparing the Command Queue to spawning and executing functions on dedicated threads with std::thread
objects, the Command Queue was able to execute 10 million function calls per second, on a Core i5, compared to std::thread
's 200,000 function calls. This benchmark is a purely synthetic test, only effective at describing the potential `cost` of spawning threads as the possibility to execute up to 500 (small) function calls in the same time it takes to spawn a thread to make the same call.
Due to it's light-weight and extremely efficient algorithms, it's usually far more efficient to execute consecutive function calls on a single thread, compared to spawning new threads or even running thread pools, where context switching can be detrimental to performance!
Create object
This is fairly costly, because it has to spawn the std::thread
and malloc()
the double-buffers.
I use malloc()
because I use realloc()
to expand the buffers at runtime when the buffers need more space
Create thread
In my original design, I used direct API calls, which are obviously faster! However, for this component I just use std::thread
, because I generally don't spawn a lot of these objects, so the cost of std::thread
is acceptable
16-bytes, it's slower than native API calls, but I only create one internal thread! So it's tolerable.
Queues
I use a custom internal `queue` implementation, using 16-bytes each. I don't use std::queue
; I wrote a long story about why I don't use it, but I eventually deleted it because I don't want to offend too many people! Lets just say std::queue
is slower, and I would never use it; in fact I would rather shoot myself in the leg than use std::queue
, and lets leave it at that!
No wait, let me just add that I have NO respect for the ability (not personal) of anyone using this abomination (std::queue
) for their worker thread `tasks` or execution queues ... ok ... I'll shut up now!
std::condition_variable
72-bytes ... WHAT? I'm really sorry about that! Not my code! That's VS2015 C++11 for you! The condition variable is used for notifications during the objects `wait-state` (sleep). I at least made sure to empty both queues before entering wait-state, because it's very costly, but I tried to minimize the impact!
std::mutex
80-bytes ... OMG! Again, no idea why it uses 80 bytes! I couldn't believe it either! The mutex is required by the condition variable so I can't avoid it! On native Windows API, I can use a Critical Section for the condition variable, which is a lot lighter! The std::mutex
and std::condition_variable
are considerably slower than native API calls! But I need them for C++11 `cross platform` support. I would actually rather implement a native Windows API AND Linux API calls than use these monstrosities!
std::atomic
Thank god at least this compiles to an xchg
instruction! I could sleep peacefully!
Terminology
command
A command is the combination of a function call (pointer address) and input values (data) required to execute it!
command queue
A command queue is a binary safe, sequentialy packed FIFO queue of commands on a pre-allocated buffer.
Two queues exits in a double-buffered configuration. commands are added to one of the command queues by producer threads, while commands are executed by an internal consumer thread on the other queue.
command queue protocol
The command queue protocol refers to the layout, storage and execution of commands on the command queue
stub function
The stub functions are special internal functions to `unpack` input values for function calls from the queue. The `stub functions` are also responsible for passing the return values from functions, just like regular functions!
Internal Components (Advanced)
This section is probably only relevant to people interested in the development of similar components, or interested in knowing how I solved some of the challenges like handling `return values`. Knowledge at this level is not necessary or required to use the component! I recommend that most people should just skip this section and go directly to the `Code Analysis` below! It's not necessary to read/understand this!
It's marked `advanced` because I don't think most people are aware of the challenges, level of difficulty or skill set involved in creating a component like this (this is not a normal queue/buffer/thread!), so unless you've tried to make something similar, I hope you don't mind the distinction; and because I imagine most programmers might not be interested in the low level technical details.
So if you're still reading ... I'll try to make the details as `user friendly` as possible, for an advanced audience! I try to make things simple, because even I struggle to follow some articles and concepts, just because they are not presented in a simple way. The concepts here are not that difficult ...
command
pseudo structure
struct command_t
{
void (*command)(void*);
uint size;
char data[0..n];
};
command = command handler (function by address) + (optional) input values
A command is the combination of a function (by address) and the input values required to execute it!
Note: I don't actually use this structure, I write this structure directly with pointers! It's simple, and only done once
command layout in x64 memory
command handler (lowest level)
typedef void ( *PFNCommandHandler ) ( void* data );
It looks very simple on the outside, it's just a `function pointer` typedef, but it's probably the heart and soul of what I call the `protocol`. When I refer to the protocol, what I really mean is the execution of these functions by the consumer thread (internal thread) inner-loop. ALL functions executed by the consumer thread have the same structure; they don't return anything, and they accept a single data pointer.
The data pointer is exactly that, a pointer to each commands `personal` data store on the command queue buffer. If you look at the memory structure layout of a command above, the data pointer is the pointer to the orange section. If the command required no data (zero) allocation, then this would actually be a pointer to the next command in the queue.
This pointer gives the command full control over what it stored on the command queue buffer. This could literally be anything, from network packets received, raw text/strings, files loaded, raw model/texture/audio data, copies of objects and structures etc.
It's actually a bit confusing to work with the raw data at this level. It requires a lot of pointer-to-pointer and pointer dereferencing to work with. Also, if your function required 2, 3 or 4 parameters, it's a bit confusing and error prone to extract the parameters. For this reason, I created the stub functions which are explained below.
There is a whole section dedicated to the stub functions, but I just wanted to say something about them here; because they are actually command handlers working at this level with the raw data. They are responsible for extracting parameters from the data pointer passed here and calling the functions. If the function you call has a return value, then the stub functions extract the return address from this data pointer, and write the return value from the function to your address. They are just wrappers at this level to automate working with the data pointer!
pseudo command handler examples
void cmdPrintf( char** pstring ) { printf( *pstring ); }
void cmdRender( Model** model ) { (*model)->render(); }
void cmdSetEvent( HANDLE* ev ) { SetEvent( *ev ); }
void cmdMouseMove( int* xy ) { state->mouse_move( xy[0], xy[1] ); }
void cmdTranslate( float* vec3 ) { glTranslatef( vec3[0], vec3[1], vec3[2] ); }
...
rawExecute( cmdPrintf, "Hello World" );
rawExecute( cmdRender, myModel );
rawExecute( cmdSetEvent, myEvent );
rawExecute( cmdMouseMove, x, y );
rawExecute( cmdTranslate, x, y, z );
As you can see, I do make things easier to work with, by redefining `void* data` to be any pointer I want. These are `easy` examples, with just a single input data type, the situation becomes tricky when you have more than one parameter of different types. For the same kind I can just use array syntax, but for various kinds I have to do manual pointer aritmetic. So that's where the stub functions come in handy! However, at this level I can also copy raw data onto the buffer with rawExecuteWithCopy()
... you can read the code if you want to know more!
command queue
command queue = binary safe, sequentialy packed FIFO queue of commands (functions with optional input data)
commands are added to the command queue by producer threads (your thread), which pre-determine and set the function to call on the internal thread, and write the optional input values for the function call.
Each command (function call) includes it's own optional (private) input data buffer, which is inherently thread-safe by design, and stored directly on the command queue.
Once added to the queue, the producer thread (yours) notifies the internal thread that new commands are available on the queue, and the commands are executed sequentially. Its important to note, that the internal thread doesn't actually require a notification, unless it's already in a wait state, because it will first check both buffers to make sure no commands exist before entering wait-state, because going into wait-state is `expensive`!
commands and their data are not `pushed on`, or `popped off` the queue in the traditional sense. After all the commands in the queue are executed, the buffer is reset (by setting a `used` index to zero), and reused/rotated with the queue avaiable to the producer thread, and looks for the next set of commands; so data allocations on this buffer are never freed, they are just temporary reserved space until the buffer is rotated
I use two independent buffers, one for `producer threads` (your thread) to add commands, while the consumer thread (internal) executes commands (function calls) on the internal buffer. After all the commands are finished executing on the internal buffer, the buffers are swoped and the process begins again. All this is done lock-free!
command queue layout in memory
stub functions
The stub functions are special internal protected functions used to `unpack` function calls from the internal command queue protocol. They are called by the consumer thread when it's ready to call your function.
I wanted to explain something about them, because they might be interesting to people wanting to know how I solved the `problem` of returning values from a thread. They are actually a very powerful `invisible man-in-the-middle` doing a lot of work behind the scenes; and essentially give the Command Queue object the `illusion` of making regular function calls; they are also responsible for writing the return values.
The process starts when you call execute()
and returns()
on the producer thread (your thread), which write the values required to make your function call onto the command queue buffer. The values written depends on what you want to do; like: how many parameters does your function require and does it return a value? These functions also write the address to a specific stub function, which is executed on the consumer thread side before your function is called, and will handle extracting the parameters from the command queue buffer for you.
Currently there are 14 stub functions, 7 functions for execute()
and 7 for returns()
. Each stub function handles a different case, depending on the number of parameters your function call requires, from zero to six parameters (0-6 input values). There is no provision for functions that require more than 6 parameters, I don't know any functions that need more than 6, but they can be added if you really need it ...
Only execute()
and returns()
is called on the producer thread, they return after writing values onto the command queue, which will eventually be executed by the consumer thread. So when its finally time for the consumer thread to execute your command, it first calls the stub function! The stub function then `unpacks` the address to your function, any optional parameters and optional return value address from the command queue.
The stub function then makes the call to your function, and optionally writes your return value!
What is interesting to note about them, is that everything they can do, can be done `manually` by calling rawExecute()
, which gives you full control over what you add to the command queue buffer, like writing network packets or raw files directly onto the buffer! The stub functions are really just convenient built-in wrappers for the most common tasks, demonstrating how flexible the command queue protocol is!
64-bit stub function layout on the command queue
Code Analysis
command handler typedef
typedef void ( *PFNCommandHandler ) ( void* data );
Only really used in the `internal` consumer thread in the inner-loop!
Double buffered command queue structure
struct queue_buffer_t
{
char* commands;
uint32_t size;
uint32_t used;
};
queue_buffer_t buffer[ 2 ];
std::atomic< queue_buffer_t* > primary = &buffer[ 0 ];
std::atomic< queue_buffer_t* > secondary = &buffer[ 1 ];
std::mutex mtxDequeue;
std::condition_variable cvDequeue;
std::mutex mtxJoin;
std::condition_variable cvJoin;
std::thread* hThread;
bool volatile shutdown = false;
CommandQueue() { this->init( 256 ); }
CommandQueue( const uint32_t size ) { this->init( size ); }
void init( const uint32_t size )
{
this->buffer[ 0 ].commands = ( char* ) ::malloc( size );
this->buffer[ 1 ].commands = ( char* ) ::malloc( size );
this->buffer[ 0 ].size = size;
this->buffer[ 1 ].size = size;
this->buffer[ 0 ].used = 0;
this->buffer[ 1 ].used = 0;
this->hThread = new std::thread( &CommandQueue::thread, this );
}
~CommandQueue()
{
this->shutdown = true;
this->cvDequeue.notify_one();
this->hThread->join();
free( this->buffer[ 0 ].commands );
free( this->buffer[ 1 ].commands );
}
commands
expands dynamically at runtime, I normally keep doubling the size with calls to realloc()
!
Each buffer is managed independently, and there is nothing stopping you from having a pool of buffers!
The producer thread writes commands to one buffer, while the consumer thread executes commands on the other
Lock-free
std::atomic< queue_buffer_t* > primary = &buffer[ 0 ];
std::atomic< queue_buffer_t* > secondary = &buffer[ 1 ];
My method is really very simple and crude, and might not be the most efficient way to do this!
The producer and consumer threads take ownership of the primary
/ secondary
pointers above by replacing the values in the variables (switching/exchanging) with a nullptr
(xchg
). When the producer thread is finished adding a commands to the buffer, it puts the address back in primary
! So if( primary == nullptr ) then
... it's being used!
Wait-free
These methods are wait-free in a single producer scenario, with my double buffers, the producer thread will always have a buffer available for writing new commands!
However, if you had more producers, two buffers might not be sufficient for wait-free! You might need to add more buffers, or change the design to allow multiple producers to write to the same buffer. An excercise for you!
consumer thread
The `internal` thread that executes the commands on a command queue
void thread()
{
queue_buffer_t* buffer = secondary.exchange( nullptr );
while ( true )
{
buffer = primary.exchange( buffer );
while ( buffer == nullptr )
buffer = secondary.exchange( nullptr );
if ( buffer->used )
{
char* base_addr = buffer->commands;
const char* end = buffer->commands + buffer->used;
do
{
(*(PFNCommandHandler*)base_addr)(base_addr+sizeof(PFNCommandHandler*)+sizeof(uint32_t));
base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) );
}
while ( base_addr < end );
buffer->used = 0;
}
else if ( this->shutdown )
break;
else
{
std::unique_lock<std::mutex> lock( mtxDequeue );
cvDequeue.wait( lock );
lock.unlock();
}
}
}
queue_buffer_t* buffer = secondary.exchange( nullptr );
The consumer thread first takes ownership of the secondary
buffer (which will have no commands in it), before entering the main loop.
while ( true )
{
buffer = primary.exchange( buffer );
...
}
Yeah I know, an infinite loop ... shocking isn't it! And no I will not use while(!shutdown)
or something similar!
Since the secondary
buffer we acquired before this is empty on startup, we don't have to process any commands in it. On entering the loop, we immediately exchange the it with the primary
buffer, checking if any producer thread(s) have added commands to it.
During the normal looping process, after processing the current buffer, it's reset/recycled and exchanged with the currently active primary
buffer (we give the producer threads a clean buffer in exchange for their `used` buffers). So they can start from a new fresh buffer, while the consumer thread processes any commands added to the primary
buffer they just exchanged.
Note: There is actually high contention for the buffer in the primary
variable! Normally the producer thread acquires this buffer, writes data to it, and puts it back. But so does the consumer thread! When the consumer thread is finished processing commands in the buffer, it resets the buffer and also swops it for the buffer in the primary
variable.
while ( buffer == nullptr )
buffer = secondary.exchange( nullptr );
while ( buffer == nullptr ) then
... wait for producer thread to finish with it's buffer ... entering this loop means that the primary
thread was busy adding a new command to the buffer when we executed the previous swop. Yes, this uses 100% CPU useage, it's from my rendering thread which uses 100% anyway!
This is the place to put some kind of waitable event system or just Sleep( 0 );
In the line before this while
statement, we put a new clean/empty buffer in the primary
variable. But when we exchanged it, we got a nullptr
value back, which means a producer thread is currently busy with the buffer that was supposed to be there. When the producer thread is finished writing to the buffer, and tries to write the pointer back to the variable, it will detect a non-null value there (the new clean/empty buffer), and will instead put it's buffer address in secondary
. Which is what we are waiting for it!
More clarification
There are 2 pointers: primary
and secondary
. Each one holds the address to one of the buffers
The presence of a nullptr
in primary
means the buffer is being used (locked, yes I know I said this is `lock-free`, but it's not really related). An address in the variable means the buffer is available for use!
Normally, when a calling thread (producer) wants to write data to the queue, it will take ownership of the primary
buffer, by removing the address (xchg
) and exhcanging it with a nullptr
; then write it's data to the buffer, and put the address back in the primary
variable! While it has ownership, the value in the primary
variable is nullptr
. And when it's finished with the buffer, it just puts the pointer back in primary
.
However, it's possible that the consumer thread has finished reading/calling/processing all the command handlers (callbacks) in the command queue it currently owns. When it's finished, it resets the buffer, and exchanges it with the one in the primary
variable, but if there was a nullptr
in the primary
variable, it means another thread is writing new commands.
if ( buffer->used )
{
...
}
else if ( this->shutdown )
break;
If there is no data in the command queue buffer, then we test the `exit thread` condition variable! We only test it during `downtime`, when the thread has nothing else to do!
char* base_addr = buffer->commands;
The reason why base_addr
is a char*
, is because it gives us byte-size flexibility with pointer arithmetic!
eg. base_addr + sizeof( PFNCommandHandler* )
... moves over the callback*, to the next value!
const char* end = buffer->commands + buffer->used;
Just store the end address for the loop! Reduces the instruction count by 2 in the inner loop!
The inner loop
do
{
(*(PFNCommandHandler*) base_addr)( base_addr + sizeof(PFNCommandHandler*) + sizeof(uint32_t));
base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) );
}
while ( base_addr < end );
(*(PFNCommandHandler*) base_addr)(base_addr + sizeof( PFNCommandHandler* ) + sizeof( uint32_t ));
We can finally start calling the command handlers, and pass them a pointer to their input data!
Remember the structure: [command*][uint32_t][data]
input void* data
address = base_addr + sizeof( command* ) + sizeof( uint32_t )
base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) );
Move to the next address. The [uint32_t]
value in our command structure gives us: data size + 12 bytes
(x64) which is the offset to the next command!
Storing the + 12 bytes
(8-byte callback* + sizeof(uint32_t)) (8-bytes for 32-bit) directly in our offset index, actually reduces the instruction count of the inner loop by 1! It's pre-calculated by the producer threads, which actually already do this calculation when they write the uses
value of the buffer stucture, so it's a win-win!
while ( base_addr < end );
end
will be the address just after our last command, this would have been the next address to `append` new commands by the producer threads if we didn't acquire the buffer for processing.
buffer->used = 0;
At this point we have completed executing all the commands in the command queue.
This is where we actually reset / recycle this buffer, it's just a very simple index variable we set!
After this, the buffer here is rotated / exchanged with the one in the primary
variable.
Rinse and repeat!
producer thread
This is any thread that adds commands to a command queue.
command = command handler (function pointer) + (optional) input required by the function
Low-level queue management functions
These are the only functions you need on the producer thread side to add commands to a command queue!
acquireBuffer()
Take ownership of a command queue buffer.
queue_buffer_t* acquireBuffer()
{
queue_buffer_t* result;
while ( ( result = primary.exchange( nullptr ) ) == nullptr )
;
return result;
}
releaseBuffer()
void releaseBuffer( queue_buffer_t* buffer )
{
queue_buffer_t* exp = nullptr;
if ( !primary.compare_exchange_strong( exp, buffer ) )
secondary = buffer;
this->cvDequeue.notify_one();
}
Called when the producer thread is finished writing commands.
Scenario: The producer thread was busy writing commands to the command queue buffer, originally in primary
; while this was happening, the consumer thread finished processing a queue, so it put a recycled queue in the primary
variable. We detect this situation when we try to write our value back to primary
, we detect the presence of a new value there because primary != nullptr
. So now we need to write our value to the secondary
variable because the consumer thread is waiting for it there!
At the same time, on the other side, the consumer thread is waiting, this is the code on the consumer thread side:
corresponding code from consumer thread loop
buffer = primary.exchange( buffer );
while ( buffer == nullptr )
buffer = secondary.exchange( nullptr );
The first line writes a new value to the primary
variable, this is detected by the scenario explained above!
The consumer thread will also detect that there was a nullptr
value in the primary
variable, and wait for the producer thread to execute releaseBuffer()
, and when it does, it will write its value into the secondary
variable!
allocCommand()
template< typename TCB >
char* allocCommand( queue_buffer_t* buffer, const TCB function, const uint32_t size )
{
const uint32_t base = buffer->used;
const uint32_t reserved = sizeof( TCB* ) + sizeof( uint32_t ) + size;
buffer->used += reserved;
if ( buffer->used > buffer->size )
{
do buffer->size *= 2;
while ( buffer->used > buffer->size );
buffer->commands = (char*) ::realloc( buffer->commands, buffer->size );
}
char* command = &buffer->commands[ base ];
*( ( TCB* ) command ) = function;
*( ( uint32_t* ) ( command + sizeof( TCB* ) ) ) = reserved;
return command + sizeof( TCB* ) + sizeof( uint32_t );
}
This is the command queue equivalent of malloc()
, it returns a pointer to a reserved space of size
bytes.
commands sit on a command queue, which is a much larger pre-allocated buffer. So these allocations just reserve space on this dynamic buffer. If the buffer doesn't have enough space, it grows dynamically with realloc()
.
And again, similar to the internalls of malloc()
, it also writes some hidden data to the memory location just before the return address. It writes the function pointer address and the total size used by the command, then returns the address to the byte immediately after the 4-bytes for the uint32_t.
General structure: [function*][uint32 size][reserved data bytes][next function* ...
Note: Each command can only call this function once! So the required size must be predetermined!
execute()
These commands have one very long line, which I cannot format nicely, so I just broke it up into 2 lines.
There are 7 of these functions, I'm not going to display all of them, they support from zero to six (0-6) parameters
execute() template taking 2 parameters
template< typename TCB, typename T1, typename T2 >
void execute( const TCB function, const T1 v1, const T2 v2 )
{
queue_buffer_t* buffer = acquireBuffer();
char* data = allocCommand( buffer, executeStubV2< TCB, T1, T2 >,
sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( T1 ) + sizeof( T2 ) );
*( ( TCB* ) data ) = function;
*( ( T1* ) ( data + sizeof( TCB* ) ) ) = v1;
*( ( T2* ) ( data + sizeof( TCB* ) + sizeof( T1 ) ) ) = v2;
releaseBuffer( buffer );
}
template< typename TCB, typename T1, typename T2 >
TCB = Typename/Template Callback - just a generic name I used for the function calls
void execute( const TCB function, const T1 v1, const T2 v2 )
what can I say?
queue_buffer_t* buffer = acquireBuffer();
`acquires` (takes ownership) of a buffer, for exclusive use, the buffer is `acquired` for a very shot time, only a few instructions, unless the buffer has to be resized
char* data = allocCommand( buffer, executeStubV2< TCB, T1, T2 >,
Part 1: we call allocCommand()
, one of the `low level` calls, which is like a call to malloc()
, because it returns a pointer to the data section which we can write data to!
executeStubV2()
This is the address of a special internal handler function, which will `unpack` the values from the command queue buffer on the other side, and actually make the function call
executeStubV2()
is also a template function, taking the same parameters. The address to this function is written directly to the command queue buffer inside the call to allocCommand()
,
sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( T1 ) + sizeof( T2 ) );
Part 2: We are reserving this much space on the command queue buffer. If there is not enough space to satisfy this request, then the buffer is resized with realloc()
*( ( TCB* ) data ) = function;
The `function` that YOU want to call, is actually the first thing written to the data section of the buffer!
*( ( T1* ) ( data + sizeof( TCB* ) ) ) = v1;
*( ( T2* ) ( data + sizeof( TCB* ) + sizeof( T1 ) ) ) = v2;
After writing the function, we write the input values (parameters) required for the function.
returns()
The structure of `returns()
` calls is almost identical to execute()
!
template< typename TCB, typename R, typename T1, typename T2 >
void returns( const TCB function, const R ret, const T1 v1, const T2 v2 )
{
queue_buffer_t* buffer = acquireBuffer();
char* data = allocCommand( buffer, returnStubV2< TCB, R, T1, T2 >,
sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( R ) + sizeof(T1) + sizeof(T2) );
*( ( TCB* ) data ) = function;
*( ( R* ) ( data + sizeof( TCB* ) ) ) = ret;
*( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) ) = v1;
*( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) ) = v2;
releaseBuffer( buffer );
}
The main differences, are the use of `R
` for the ret
urn address types.
Also, the order of writing data is slightly different. We write the `R ret
` value to the buffer directly after writing the function pointer, but before writing the parameters!
This also forces us to recalculate the position of the input values, note the sizeof( R )
when writing T1
and T2
!
And the corresponding returnStubV2()
function on the other side will `unpack` these values!
returnStubV#()
template< typename TCB, typename R, typename T1, typename T2 >
static void returnStubV2( char* data )
{
const TCB function = *( ( TCB* ) data );
const T1 v1 = *( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) );
const T2 v2 = *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) );
**( ( R* ) ( data + sizeof( TCB* ) ) ) = function( v1, v2 );
}
There are 7 of these functions, I'm just going to show the corresponding one for the returns()
above!
template< typename TCB, typename R, typename T1, typename T2 >
Pretty much the same template as the returns()
static void returnStubV2( char* data )
`should` be static
, this function will be called by the internal thread
Very important to note, is the char* data
Since this is a `command handler`, and it will be called by the internal consumer thread, it requires a specific pre-defined structure, which includes accepting one pointer. The reason why I use char*
is because it makes pointer addition (sizeof()
) a lot easier!
const TCB function = *( ( TCB* ) data );
We extract the function address from the data pointer
const T1 v1 = *( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) );
const T2 v2 = *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) );
Extract 2x parameters, since this is the `2 parameter` version
**( ( R* ) ( data + sizeof( TCB* ) ) ) = function( v1, v2 );
This is where we actually make the function call, and we write the return value directly to the address stored in the buffer. This might seem a bit complicated, but it's not really.
On the data buffer, after sizeof( TCB*)
is the address of the return value (R
) we stored.
The end
Rant
Sorry, but I gotta get this off my chest! This could be an entire article by itself!
Disclaimer: I just wish more programmers would be motivated to write faster executing code, but we don't all share the same goals or the same level of knowledge, and I know there are external pressures on programmers to deliver on deadlines, I just wish it wasn't so!
about other multi-threaded code
What really prompted me to release my code, was looking at several articles and source code on the same subject; they just made me sick! I'm sorry to say it, but there's no polite way for me to explain what I felt when I saw the vile abominations, excrement and violations of something so sacred to me! Not only does the source code look like an absolute abomination from hell, but the entire concept is a violation of everything I hold dear, especially about high performance programming! Sure, you want to use a ring-buffer to `improve performance`, and possibly reduce memory consumption, I get it! But it's not only the buffer that's the problem, your entire implementation is flawed and slow as f%#$? What's the point of trying to make something perform faster, when you only deal with half the problem, because your entire design is flawed and suffers from massive conceptual bottlenecks, on a collossal scale? If you are going to take the time to show your readers how a ring buffer works, because you think its necessary to speed up your implementation, but you are absolutely ignorant of the fact that the rest of your code is a travesty! For crying out loud, learn how to optimize the rest of it! You don't need a damn ring buffer! You need to learn how things actually work at a lower level!!! And for those implementing std::queue
/std::vector
for their execution queues, I just have nothing good to say about that! When you think about multi-threading, you are thinking about `performance`, aren't you? Isn't that the whole point? So why the %#$@ would you send your whole function queue through a meat grinder, piece of crap, slow as a mule, thing like an std::queue
, don't you know anything about writing optimized code yourself? Have you actually looked inside the internals of that abomination? Don't you have any respect for your code? Sorry, this was aimed at other people writing articles on this subject, `promoting` concepts, ideas and code that I don't agree with! Each to their own! Not personal!
about the state of programming in general
Where have all the low-level programmers gone? Why are there so few articles being released about lower-level techniques these days? Have they all been discovered already? Why does it seem like everyone is jumping on the next bandwagon language or C++ revision, is it really the holy-grail? Doesn't anyone stop to think or examine what they're doing or using, or do they all just blindly accept and stay away from peeking inside those vile internals? "I need to do xyz", and the answer is: "just use Boost"! Do you know why std::string
uses 40-bytes of overhead? Why do I feel the necessity to put "low-level" in the title, just so mainstream/`modern` programmers won't criticise my use of function pointers and raw pointer arithmetic, are they really that detestable? Is nobody interested in these techniques anymore? These concepts aren't even that low-level to me, and I should imagine that every professional C/C++ programmer should be comfortable with them! And why does my damn Firefox browser currenly need 80 threads and 2GB of RAM to open 6 tabs? Does Gmail and YouTube really require that much to function? And for that matter, why does Google Chrome currently need 7 processes, 99 threads and 700MB RAM to open a single Gmail tab??? I literally just counted it! And I can't even tell if my Gmail tab is running on the process with 45 threads or the one using 350MB of RAM! For crying out loud people, it's just Gmail! Do you know what I can do with 99 threads and 700MB of RAM? Does a &^$%@&ing Gmail tab really need all that??? 99 threads!!! If anyone working on Firefox or V8 actually gave a f*** about true performance, I would have a faster browsing experience! I constantly need to close tabs because Chrome and Firefox consume all 8GB of my RAM! Why is it widely perceived that Moore's law is still in effect, but it doesn't feel like it because everyone just keeps adding more and more layers of abstraction, abstraction on top of abstraction; "hey look at this, my abstraction uses another layer of abstraction!" Are these concepts really that hard to grasp and work with? Raise the bar people, and lets get back to being hackers in the original sense of the word!
Ok ... rant over! Whew!
Assembler output of consumer thread loop
This is from my Windows API version, I just thought I would leave it here for reference. The C++11 version similar
while ( true )
{
commandQ = (command_queue_t*) InterlockedExchangePointer( &primary, commandQ );
13F9ECDE0 48 87 1D E9 B0 00 00 xchg rbx,qword ptr [primary (013F9F7ED0h)]
while ( commandQ == nullptr )
13F9ECDE7 48 85 DB test rbx,rbx
13F9ECDEA 75 13 jne (013F9ECDFFh)
13F9ECDEC 0F 1F 40 00 nop dword ptr [rax]
{
commandQ = (command_queue_t*) InterlockedExchangePointer( &secondary, nullptr );
13F9ECDF0 48 8B DD mov rbx,rbp
13F9ECDF3 48 87 1D CE B0 00 00 xchg rbx,qword ptr [(013F9F7EC8h)]
13F9ECDFA 48 85 DB test rbx,rbx
13F9ECDFD 74 F1 je (013F9ECDF0h)
}
if ( commandQ->used )
13F9ECDFF 8B 43 0C mov eax,dword ptr [rbx+0Ch]
13F9ECE02 85 C0 test eax,eax
13F9ECE04 74 20 je (013F9ECE26h)
{
char* base_addr = commandQ->commands;
13F9ECE06 48 8B 3B mov rdi,qword ptr [rbx]
const char* end = commandQ->commands + commandQ->used;
13F9ECE09 48 8D 34 07 lea rsi,[rdi+rax]
13F9ECE0D 0F 1F 00 nop dword ptr [rax]
do
{
( *(PFNCommandCallback*) base_addr )( base_addr + sizeof(PFNCommandCallback*) + sizeof(UINT) );
13F9ECE10 48 8D 4F 0C lea rcx,[rdi+0Ch]
13F9ECE14 FF 17 call qword ptr [rdi]
base_addr += ( *( UINT* ) ( base_addr + sizeof( PFNCommandCallback* ) ) );
13F9ECE16 8B 47 08 mov eax,dword ptr [rdi+8]
13F9ECE19 48 03 F8 add rdi,rax
}
while ( base_addr < end );
13F9ECE1C 48 3B FE cmp rdi,rsi
13F9ECE1F 72 EF jb (013F9ECE10h)
commandQ->used = 0;
13F9ECE21 89 6B 0C mov dword ptr [rbx+0Ch],ebp
13F9ECE24 EB BA jmp (013F9ECDE0h)
}
else if ( exitThread )
13F9ECE26 8B 05 20 B3 00 00 mov eax,dword ptr [(013F9F814Ch)]
13F9ECE2C 85 C0 test eax,eax
13F9ECE2E 74 B0 je (013F9ECDE0h)
break;
}