Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / IoT / Arduino

Use Both Cores on an ESP32: Easy Synchronization with Esp32SynchronizationContext

4.85/5 (9 votes)
25 Feb 2021MIT11 min read 23K   331  
Take a page from .NET and enjoy an easy way to safely pass information between threads on an ESP32
Take a great idea from .NET and use it to open up the other ESP32 core you've let collect dust.

Esp32SynchronizationContext sample

Introduction

It's rough to debug IoT devices. Many do not have integrated debugger probes and even the ones that do run over a slow interface like serial UART or at best, JTAG. This means step through debugging is either off the table or so slow as to be more painful than useful.

It's even worse to debug multithreaded code. Safely accessing data between threads is not for the faint of heart, and any wrong move can result in intermittent problems which are extremely difficult to track down, even on a full PC with an integrated debugging environment.

Forget about combining the two, especially given the ESP32's serial interface leading to long development and debug cycles. It's just not economical. Either that or you'll go nuts.

As a consequence, you've probably been running your fancy dual core ESP32 on a single core, leaving the other one to rot. You don't have to. What if I told you we could dramatically simplify general case synchronization, so you can freely create multithreaded code without all the fuss?

Conceptualizing this Mess

There are many ways to synchronize access to data such that it can safely be read and written from multiple threads. Some of them are easy to use, some aren't. Some are very general purpose, but most are quite specific to what you are doing.

For the core of our synchronization, we'll be using message passing. We'll be using a thread safe ring buffer to queue messages. Messages can be queued by any thread and will be retrieved to be processed on a target thread - usually the main application's thread.

If thread A wants to send a message to thread B, they both must have access to the message ring buffer R. Thread A sends a message into the buffer R, while Thread B is typically looping, retrieving messages as they become available. Both sending and retrieving messages are thread safe operations.

That's all well and good as an underlying mechanism, but it could stand some simplification. In this case, we'll simplify it by using a single kind of message, that does one thing. That one thing is pretty flexible though.

Before we get too deep into it, we're going to explore a clever .NET orchestration of message passing that accomplishes what we want, before adapting it for the ESP32.

Stealing from Microsoft

In .NET, Microsoft introduced the SynchronizationContext. It's basically a thread safe message passing implementation whose messages are delegates. Using this, you can post an anonymous method from Thread A to be called by Thread B, effectively causing any target code you desire to be executed on the target thread (B) rather than the current thread (A).

Normally, when we think of synchronization, we think of creating read and write barriers around data, but in this paradigm, we're sidestepping that form of synchronization altogether. In the alternative, we're simply going to dispatch code from one thread to be executed on the other thread. We can use this code to transmit results, statuses, and notifications from our thread's operation.

This makes executing code that safely updates the UI from a secondary thread for example, quite easy to write. Here's an example of using one from a .NET console application, but you'll most often find them in Windows Forms or WPF applications:

C#
static MessagingSynchronizationContext _syncContext = new MessagingSynchronizationContext();
static ulong _count;
static void Main()
{
    ThreadPool.QueueUserWorkItem((state) => {
        while(true)
        {
            // This thread just posts Hello World 1! and a count
            // to the target thread over and over again, delaying
            // by 3/4 of a second each time

            // infinite loop
            // use Post() to execute code on the target thread
            // - does not block here
            _syncContext.Post(new SendOrPostCallback((state2) => {
                // BEGIN EXECUTE ON TARGET THREAD
                Console.WriteLine("Hello World 1! Count: {0}", _count);
                // normally we couldn't access _count
                // from a different thread safely
                // but this always runs on the target 
                // thread so we're okay since _count
                // never gets touched by any other
                // thread
                ++_count;
                // END EXECUTE ON TARGET THREAD
            }),null);
            // EXECUTES ON THIS THREAD:
            Thread.Sleep(750);
        }
    });
    ThreadPool.QueueUserWorkItem((state) => {
        while (true)
        {
            // This thread just posts Hello World 1! and a count
            // to the target thread over and over again, delaying
            // by 3/4 of a second each time

            // infinite loop
            // use Send() to execute code on the target thread
            // - blocks here until the target function returns
            _syncContext.Send(new SendOrPostCallback((state2) => {
                // BEGIN EXECUTE ON TARGET THREAD
                Console.WriteLine("Hello World 2! Count: {0}", _count);
                // normally we couldn't access _count
                // from a different thread safely
                ++_count;
                // END EXECUTE ON TARGET THREAD
            }), null);
            // EXECUTES ON THIS THREAD:
            Thread.Sleep(1000);
        }
    });

    // start the main message loop:
    _syncContext.Start();
}

Running this will give you something like:

Hello World 1! Count: 0
Hello World 2! Count: 1
Hello World 1! Count: 2
Hello World 2! Count: 3
Hello World 1! Count: 4
Hello World 2! Count: 5
Hello World 1! Count: 6
Hello World 1! Count: 7
Hello World 2! Count: 8
...

Here, the key is we have two threads accessing _count and writing to the Console, right?

No, we do not. All of the code inside the lambdas contained by _syncContext.Send() and _syncContext.Post() are actually dispatched on the thread _syncContext.Start() was called from.

This works because Send() and Post() don't actually execute the delegates they are given. Instead, they package them up as a message and put them in the message queue. Meanwhile, Start() is spinning a loop behind the scenes, retrieving messages from the queue and then calling the delegates they contain!

Because of this, the delegates are only getting executing on one thread, and in the order they appear in the queue. The trick then, is to do most of your work in the secondary thread, and then use Send() or Post() to update the main thread with the results of your long running operation.

The difference between Send() and Post() is Send() blocks until the delegate is executed on the target thread and returns. Send() is actually more work for the CPU to do than a fully asynchronous Post(), so use Post() if you can get away with it.

Note that Send() and Post() are the only members we've covered defined by SynchronizationContext itself. The rest of the members are implementation specific, and in this case, they are specific to my custom SynchronizationContext implementation called MessagingSynchronizationContext.

That's great, but that's .NET. We're not dealing with .NET here, but a little theft goes a long way. We're going to recreate this concept using the Arduino framework and FreeRTOS running on an ESP32. In the process, we'll be producing something very similar to MessagingSynchronizationContext, but for the ESP32 in C++, and "Arduinoized."

The first thing I'm going to do is take you through the .NET implementation of the MessagingSynchronizationContext, since we'll be recreating it.

Coding this Mess

The MessagingSynchronizationContext class uses a MessageQueue to handle posting messages to a thread safe queue. We won't explore MessageQueue in detail because it's outside the scope here. All it is, is a thread safe queue that blocks until more messages are available.

All messages posted to the queue take the following form:

C#
private struct Message
{
    public readonly SendOrPostCallback Callback;
    public readonly object State;
    public readonly ManualResetEventSlim FinishedEvent;
    public Message
    (SendOrPostCallback callback, object state, ManualResetEventSlim finishedEvent)
    {
        Callback = callback;
        State = state;
        FinishedEvent = finishedEvent;
    }
    public Message(SendOrPostCallback callback, object state) : this(callback, state, null)
    {
    }
}

Here, Callback is a delegate that points to the code in our handler - which is usually a lambda. State is application defined state to passed along with the call, which we don't use. FinishedEvent is used for signalling when the Callback delegate is done executing. This is used by Send(), but not by Post(), where it's always null.

The code for making post and send work are below:

C#
/// <summary>
/// Sends a message and does not wait
/// </summary>
/// <param name="callback">The delegate to execute</param>
/// <param name="state">The state associated with the message</param>
public override void Post(SendOrPostCallback callback, object state)
{
    _messageQueue.Post(new Message(callback, state));
}
/// <summary>
/// Sends a message and waits for completion
/// </summary>
/// <param name="callback">The delegate to execute</param>
/// <param name="state">The state associated with the message</param>
public override void Send(SendOrPostCallback callback, object state)
{
    var ev = new ManualResetEventSlim(false);
    try
    {
        _messageQueue.Post(new Message(callback, state, ev));
        ev.Wait();
    }
    finally
    {
        ev.Dispose();
    }
}

The Post() method is pretty straightforward. Send() is almost as straightforward, but it has additional code to wait, and then dispose of the message's associated FinishedEvent.

Here's a primary Start() implementation. This is where the messages get dispatched and the delegates executed:

C#
/// <summary>
/// Starts the message loop
/// </summary>
public void Start()
{
    while (Step()) ;
}
public bool Step()
{
    if (_messageQueue.IsEmpty)
        return true;
    // blocks until a message comes in:
    Message msg = _messageQueue.Receive();
    // execute the code on this thread
    msg.Callback?.Invoke(msg.State);
    // let Send() know we're done:
    if (null != msg.FinishedEvent)
        msg.FinishedEvent.Set();
    return null != msg.Callback;
}

Here, it delegates to Step() in a loop until it gets a false result. Step() pulls the next Message out of the queue, executes the Callback delegate, if there is one, and then if there's a FinishedEvent (indicating Send() was called) it sets it, allowing the Wait() from earlier to complete. If there was no delegate, false is returned, which indicates the message was a "quit" message, which is a special message that gets posted when Stop() is called. This allows you to call Stop() from another thread to exit the loop.

That's really all the magic that's involved. Now let's recreate it for the ESP32 in C++.

The ESP32 Rendition

We'll have to do a bit of spelunking into FreeRTOS, which is the real-time OS used by the ESP32 to handle thread scheduling, basic I/O, and things like that. It's not the ESP-IDF, but if you use the ESP-IDF, you will likely use FreeRTOS calls in the same code. When you are using the Arduino framework on the ESP32, you are also using the ESP-IDF and FreeRTOS under the covers, by way of the Arduino code that wraps it. In this case, we're just going to use some of it directly, since the Arduino Framework isn't particularly thread aware, nor does it provide access to the nifty circular buffer implementation we'll be using, as far as I know. Luckily, the stuff we're using from it, while a bit clunky if you're not used to using it, is simple simple!

Our Esp32SynchronizationContext class will use a FreeRTOS based circular buffer for what we used MessageQueue for above, and the FreeRTOS "tasks" API to handle the heavy lifting.

Don't confuse FreeRTOS tasks here with the .NET Task class. They're much different beasts. Tasks in FreeRTOS are basically either fibers (cooperatively scheduled) or threads (pre-emptively scheduled by the OS or running on another core). We'll be using them as threads.

Realtime Wrinkle: Timeouts

We're going to try to keep the code and concepts pretty close to each other. One signficant difference however, is that a real time OS must guarantee latencies, or at least maximum latencies, for pretty anything it does. That means you can't just wait forever for something to complete. You have to give a timeout, because it simply won't wait forever. I've added timeout parameters where appropriate. In one instance, that makes things interesting..

Arduinoisms: Lifetime and Updating

It's typical with Arduino libraries to forgo using the C++ RAII pattern and instead use a begin() method to do primary initialization, possibly taking initialization parameters. Whatever you or I may think of this, it's how things are typically done with the Arduino code and what people usually expect. This method can sometimes be accompanied by an end() method that tears down. Sometimes, libraries don't bother since these platforms don't have a graceful shutdown mechanism in the framework. The begin() method is usually called in setup(). If a library is cooperatively "threaded", it will probably need some CPU during the loop() call as well. I don't know that there's a standard method name for this but my classes that use the begin()/end() paradigm also use update() if they need to have something run inside loop().

Esp32SynchronizationContext is no exception to the above. If you want to use a synchronization context in your code's main thread, then use begin() - usually in setup() - to initialize the synchronization context. Use end() if you want to deinitialize it, although this may never need to be called depending on your situation. Call update() inside loop().

You can use the synchronization context to target other threads as well. Just call update() in the appropriate thread's main loop. You usually won't need that unless your scenario is much more complicated than you usually need for an IoT device.

Revisiting the Initial Example, ESP32 Style

Here's the ESP32 sample code that does the equivalent to the first bit of C# code we explored at the top of the article:

C++
#include <Arduino.h>
#include "Esp32SynchronizationContext.h"

// use this to synchronize calls by executing functors on the target thread
Esp32SynchronizationContext g_mainSync;
// just something we can increment
unsigned long long g_count;
void thread1(void * state){
  // This task just posts Hello World 1! and a count
  // to the target thread over and over again, delaying
  // by 3/4 of a second each time

  // infinite loop or stop if error
  // use post() to execute code on the target thread
  // - does not block here
  while(g_mainSync.post([](void*state){
    // BEGIN EXECUTE ON TARGET THREAD
    Serial.printf("Hello world 1! - Count: %llu\r\n",g_count);
    // normally we couldn't access g_count
    // from a different task/thread safely
    // but this always runs on the target 
    // thread so we're okay since g_count
    // never gets touched by any other
    // thread
    ++g_count;
    // END EXECUTE ON TARGET THREAD
    })) {
      // EXECUTES ON THIS THREAD:
      delay(750);
    }
    
  // never executes unless error, but if 
  // we get here, delete the task
  vTaskDelete( NULL );
}
void thread2(void * state){
  // This task just sends Hello World 2! and a count
  // to the target thread over and over again, delaying
  // by 1 second each time

  // infinite loop or stop if error
  // use send() to execute code on the target thread
  // - blocks here until method returns
  while(g_mainSync.send([](void*state){
    // BEGIN EXECUTE ON TARGET THREAD
    Serial.printf("Hello world 2! - Count: %llu\r\n",g_count);
    // normally we couldn't access g_count
    // from a different task/thread safely
    ++g_count;
    // END EXECUTE ON TARGET THREAD
    })) {
      // EXECUTES ON THIS THREAD:
      delay(1000);
    }
    
  // never executes unless error, but if 
  // we get here, delete the task
  vTaskDelete( NULL );
}

void setup()
{
  g_count = 0;
  Serial.begin(115200);
  // initialize our synchronization context
  if(!g_mainSync.begin()) {
    Serial.println("Error initializing synchronization context");
    while(true);          // halt
  }
  // create a task on the first core (the one that FreeRTOS runs on)
  xTaskCreatePinnedToCore(
    thread1,              // Function that should be called
    "Message feeder 1",   // Name of the task (for debugging)
    1000,                 // Stack size (bytes)
    NULL,                 // Parameter to pass
    1,                    // Task priority
    NULL,                 // Task handle
    0                     // core
  );
  // create a task on the second core (the one setup()/loop() run on, 
  // and the one the Arduino framework runs on)
  xTaskCreatePinnedToCore(
    thread2,              // Function that should be called
    "Message feeder 2",   // Name of the task (for debugging)
    1000,                 // Stack size (bytes)
    NULL,                 // Parameter to pass
    1,                    // Task priority
    NULL,                 // Task handle
    1                     // core
  );
}

void loop()
{
  // This simply dispatches calls made by send() or post()
  // by executing them here. Note that long running methods
  // or a backlogged queue can cause this to block for a 
  // significant amount of time. Try to avoid putting long
  // running calls into the synchronization context themselves
  // that's what tasks are for anyway.
  if(!g_mainSync.update()) {
    Serial.println("Could not update synchronization context");
  }
}

The overarching code is fundamentally the same. Where we use C# lambdas, we use C++ lambdas. While those are supported using delegates, ours are supported by functors in C++. The only real differences here are we aren't using exception handling and we've pinned our two threads to two different cores, while in the .NET rendition, we allowed the ThreadPool to assign which core each thread ran on.

Implementing Message (Again)

Let's take a look at Message, in C++ this time:

C++
struct Message {
    std::function<void(void*)> callback;
    void* state;
    TaskHandle_t finishedNotifyHandle;
};

This is very similar to what we had before. We're using std::function<void(void*)> instead of SendOrPostCallback. We're using void* instead of object for the state. We're using this odd beast called a TaskHandle_t for our finished signal. That is a thread id, essentially. FreeRTOS has a special synchronization primitive that is optimized for certain cases, and ours is one of those cases. They are lighter weight than semaphores or mutexes, and will allow us to signal very much the same way we do with FinishedEvent. However, unlike a .NET ManualResetEvent, using this mechanism, the signal must be directed at a particular thread, rather than any and all waiting threads. That serves us perfectly well here. If anything, it's better, because it's exactly what we want, and no more than that - there will only ever be one thread waiting on this finished notification, and that's the thread that called send()/Send().

Sending and Posting, The Ring Buffer Way

Let's look at send and post again, this time using the FreeRTOS ring buffer API.

C++
// posts a message to the thread update() is called from. This method does not block
bool post(std::function<void(void *)> fn, void *state = nullptr, uint32_t timeoutMS = 10000)
{
    Message msg;
    msg.callback = fn;
    msg.state = state;
    msg.finishedNotifyHandle = nullptr;
    UBaseType_t res = xRingbufferSend
    (m_messageRingBufferHandle, &msg, sizeof(msg), pdMS_TO_TICKS(timeoutMS));
    return (res == pdTRUE);
}
// sends a message to the thread update() is called from. 
// This method blocks until the update thread executes the method and it returns.
bool send(std::function<void(void *)> fn, void *state = nullptr, uint32_t timeoutMS = 10000)
{
    Message msg;
    msg.callback = fn;
    msg.state = state;
    msg.finishedNotifyHandle = xTaskGetCurrentTaskHandle();
    uint32_t mss = millis();
    UBaseType_t res = xRingbufferSend
    (m_messageRingBufferHandle, &msg, sizeof(msg), pdMS_TO_TICKS(timeoutMS));
    mss = millis() - mss;
    if (timeoutMS >= mss)
        timeoutMS -= mss;
    else
        timeoutMS = 0;
    if (res == pdTRUE)
    {
        ulTaskNotifyTake(pdTRUE, pdMS_TO_TICKS(timeoutMS));
        return true;
    }
    return false;
}

post() is really simple and should be pretty self-evident, xRingBufferSend()'s odd types notwithstanding. Basically, we construct a message, and then post it to the ring buffer. It will block for a maximum of timeoutMS while waiting for more room in the ring buffer. After that, it fails. If this is happening in your code, you have long running code being posted or sent. Don't do that.

send() is a bit more involved. It also has to grab the current thread's id, called a "task handle" so we can signal it later. Note our foolishness with the timeout. The idea here is we don't want the total time it takes to execute this to be longer than timeoutMS. That includes the time it takes to post a message to the ring buffer. Because of this, we have to subtract the time it took to post the message and use the result as a timeout for the completion signal.

ulTaskNotifyTake() is a fancy way of saying manualResetEvent.Wait().

Dispatches From a Ring Buffer

We're very nearly done. The last step is to process messages as they become available in the ring buffer, and execute the code they point to:

C++
// processes pending messages in the message queue. 
// This should be called in a loop on the target thread.
bool update()
{
    //Receive an item from no-split ring buffer
    size_t size = sizeof(Message);
    Message *pmsg = (Message *)xRingbufferReceive(m_messageRingBufferHandle, &size, 0);
    // no messages available, return success, doing nothing:
    if (nullptr == pmsg)
        return true;
    // something is very wrong:
    if (size != sizeof(Message))
        return false;
    Message msg = *pmsg;
    // tell the ring buffer we're done with the current message
    vRingbufferReturnItem(m_messageRingBufferHandle, pmsg);
    // call the callback, with the state as the argument
    msg.callback(msg.state);
    // if this was a send()...
    if (nullptr != msg.finishedNotifyHandle)
    {
        // signal that it's finished
        xTaskNotifyGive(msg.finishedNotifyHandle);
    }
    return true;
}

Where to Go From Here

The obvious next step is to create something like Microsoft's TPL (aka the Task Framework) library for the ESP32, perhaps using new C++ awaitable features assuming you can convince your ESP32 toolchain to use the latest C++ compiler. Even without that, this technique should make it much easier to use that lonely second core. Happy coding!

History

  • 25th February, 2021 - Initial submission

License

This article, along with any associated source code and files, is licensed under The MIT License