This article will mainly focus on awaitable object, task and co_await. It comes with a real demo sample to demonstrate streaming both requests and responses between client and server with the best network efficiency. The provided sample code as well as others listed at the end can be compiled with GCC 10.0.1 and Visual C++ 2017 or later.
Introduction
C++20 has introduced a set of new features which will change the way we design and write applications by use of modern C++. It is believed that think coroutine is the most important one among these C++20 new features.
C++20 coroutine is involved with a lot of new concepts such as awaitable object, task, promise type, co_await
, co_yield
, co_return
, and so on. However, there are not many real samples available at such an early stage for learning on the web sides yet.
UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the github site. The framework well supports modern language development features such as anonymous function, closure, Lambda expression, async
/await
, future, promise, yield and so on. It is already integrated with the C++20 coroutine feature with many samples for client/server communication, database accessing, file exchanging between client and server, and persistent message queue. It is believed that these samples can certainly assist you to understand the C++20 coroutine.
This short article will touch all of these concepts except co_yield
and co_return
. However, it will mainly focus on awaitable object, task and co_await
. Further, this article comes with a real demo sample to demonstrate streaming both requests and responses between client and server with the best network efficiency. The provided sample code as well as others listed at end can be compiled with GCC 10.0.1 and Visual C++ 2017 or later.
Preparations
First of all, clone SocketPro package at the github site. The original sample code is located at the file ../socketpro/tutorials/cplusplus/hello_world/client/cw_hw.cpp. Next, navigate to SocketPro development index file ../socketpro/doc/dev_guide.htm.
It is highly recommended to read through the following short articles in order:
- Fundamentals about SocketPro Communication Framework
- CUQueue and Compatibility among Different Development Languages
- Get started with SocketPro
- C/C++ (C++11 or later)
The third article guides you to distribute all required components. Specifically, we will use the sample server application all_servers
for the coming testings. At this time, run it with listening port at 20901 as a remote SocketPro server after distributing these components properly.
Main Test Code
The below code snippet 1 shows the main unit test code related with C++20 coroutine task and awaitable objects queue. First of all, it is required to refer C++20 coroutine header file coroutine as shown at the very top of the code snippet.
#if __has_include(<coroutine>)
#include <coroutine> //GCC 10.0.1 or later
#elif __has_include(<experimental/coroutine>)
#include <experimental/coroutine> //Visual C++ 2017 & 2019 16.8.0 before
#else
static_assert(false, "No co_await support");
#endif
#include <iostream>
#include <deque>
#include "../../uqueue_demo/mystruct.h"
#include "HW.h"
using namespace std;
using namespace SPA;
using namespace SPA::ClientSide;
CMyStruct ms0;
typedef CSocketPool<HelloWorld> CMyPool;
deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
}
CAwTask MyTest(CMyPool::PHandler& hw) {
}
int main(int argc, char* argv[]) {
CMyPool spHw;
CConnectionContext cc("localhost", 20901, L"MyUserId", L"MyPassword");
if (!spHw.StartSocketPool(cc, 1)) { wcout << "No connection to remote hello world server\n";
}
else {
auto hw = spHw.Seek(); SetMyStruct(ms0);
MyTest(hw); }
wcout << L"Press a key to kill the demo ......\n";
::getchar();
return 0;
}
Code snippet 1: Main unit test code involved with C++20 coroutine task and awaitable objects queue
We are going to focus the two functions CreateAwaitables
and MyTest
later. The first one will return an instance of deque containing an array of C++20 awaitable objects RWaiter<wstring>
. The second one will return a C++20 task CAwTask
, which is really similar to the key word async
in functionality within other development languages such as .NET, JavaScript and Python. We will discuss them later.
Now, let us move to the main function. Inside the function, first we create socket pool having one non-blocking socket session which is hosted within one worker thread. It is noted that a socket pool can have any number of worker threads. Each of these worker threads can host any number of non-blocking socket sessions to different remote servers. Here, for code clarity, we use one non-blocking socket session hosted within one worker thread for this demonstration. In reality, it is recommended that a socket pool have one worker thread under most cases.
At the end, we call the method MyTest
for demonstration of C++20 coroutine
and co_wait
. You can compile the small piece of code with commented compile options right before the main function, and test this sample client application against the above mentioned server all_servers
.
Dissecting MyTest, CAwTask and CreateAwaitables
First of all, the method MyTest
within the below code snippet 2 is a C++20 coroutine, which always returns an instance of CAwTask
. In functionality, CAwTask
is similar to the keyword async
of other languages such as .NET, JavaScript and Python. However, we can customize the class CAwTask
and its inner class promise_type
within C++20 as shown at the top. Under most cases, the class definition here will be fine to your needs without any modification. For details, you may refer to this article. Pay attention to comments at lines 4 and 10. Once a coroutine is going to be called, an instance of CAwTask
will be created by calling the method promise_type::get_return_object
. At the end, the method promise_type::final_suspend
will be called when the C++20 coroutine MyTest
is about to exit. It is recommended you put debug break points and step through them for better understanding.
struct CAwTask {
struct promise_type {
CAwTask get_return_object() {
return { }; }
std::suspend_never initial_suspend() {
return { };
}
std::suspend_never final_suspend() {
return { }; }
void return_void() {
}
void unhandled_exception() {
}
};
};
deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
auto aw0 = hw->wait_send<wstring>(idSayHello, L"John", L"Dole"); auto aw1 = hw->wait_send<wstring>(idSayHello, L"Hillary", L"Clinton");
auto aw2 = hw->wait_send<wstring>(idSayHello, L"Donald", L"Trump");
auto aw3 = hw->wait_send<wstring>(idSayHello, L"Joe", L"Biden");
auto aw4 = hw->wait_send<wstring>(idSayHello, L"Mike", L"Pence"); return {aw0, aw1, aw2, aw3, aw4}; }
CAwTask MyTest(CMyPool::PHandler& hw) {
try {
auto qWaiter = CreateAwaitables(hw); BWaiter ws = hw->wait_sendRequest(idSleep, (int)5000); RWaiter<CMyStruct> wms = hw->wait_send<CMyStruct>(idEcho, ms0);
while(qWaiter.size()) { wcout << co_await qWaiter.front() << "\n";
qWaiter.pop_front();
}
wcout << "Waiting sleep ......\n";
CScopeUQueue sb = co_await ws;
assert(sb->GetSize() == 0);
CMyStruct ms = co_await wms; wcout << "(ms == ms0): " << ((ms == ms0) ? 1 : 0)
<< "\nAll requests processed\n";
}
catch (CServerError& ex) { wcout << ex.ToString() << "\n";
}
catch (CSocketError& ex) {
wcout << ex.ToString() << "\n";
}
catch (exception& ex) {
wcout << "Unexpected error: " << ex.what() << "\n";
} }
Code snippet 2: CreateAwaitables and MyTest for demonstration of C++20 coroutine task and awaitable objects
Here, we use the method CreateAwaitables
as shown at line 31 to create five awaitable objects (aw0
, aw1
, aw2
, aw3
and aw4
) as shown at line 20 through 26. Each of these awaitable objects corresponds to one request from client to server. We put them into a deque container for later use. Further, we obtain two extra awaitable objects at lines 32 and 33 by sending two requests to server for processing. All of seven requests and expected responses are streamed. SocketPro internally uses its inline batching algorithm to batch these requests and responses data at both client and server sides. SocketPro well supports streaming both requests and responses for the best network efficiency by design, which significantly boosts application both performance and scalability. Our study results show that performance improvements could be easily from 90% for local area network up to 30000% for wide area network. This feature is one of SocketPro shining points. You can hardly find this excellent feature within other frameworks.
After collecting all awaitable objects, we start to co_await
them at line 36 through 44. It is noted that all RWaiter<wstring>
, BWaiter
and RWaiter<CMyStruct>
awaitable objects are copyable and moveable. You can put them into standard library containers such as vector
, deque
, stack
, map
, and so on.
Inside the C++20 coroutine function MyTest
, you can use try
/catch
for exception handling as shown at line 48 through 56. The error CServerError
comes an exception from a remote SocketPro
server. For example, you will get such an exception if you uncomment line 25 but comment out line 24. Alternatively, you can also get a CServerError
exception by giving a negative value instead of positive 5000 for the second input at line 32. SocketPro client adapter will also throw a CSocketError
communication error in case either an underlying socket is closed or a request is canceled. To test the exception CSocketError
, you can brutally kill the test sample server application all_servers
right after running this client application.
Uncovering RWaiter/wait_send and BWaiter/wait_sendRequest
The below code snippet 3 shows definitions for two awaitable classes, RWaiter
and BWaiter
at lines 2 and 31, respectively. The first one is a template class which derives from a base awaitable class CWaiterBase<R>
, but the second one is a regular class deriving from CWaiterBase<CScopeUQueue>
. Here, template argument R
and CScopeUQueue
represents an expected returning data and an array of bytes from a remote SocketPro server, respectively. It is noted that the array of bytes will be used to de-serialize zero, one or more different types of data later.
template<typename R>
struct RWaiter : public CWaiterBase<R> { RWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
const unsigned char* pBuffer, unsigned int size)
: CWaiterBase<R>(reqId) {
auto& wc = this->m_wc;
if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
try {
ar >> wc->m_r; } catch (...) { wc->m_ex = std::current_exception();
}
wc->resume();
}, this->get_aborted(), this->get_se())) {
ash->raise(reqId);
}
}
};
template<typename R, typename ... Ts>
RWaiter<R> wait_send(unsigned short reqId, const Ts& ... args) {
CScopeUQueue sb;
sb->Save(args ...);
return RWaiter<R>(this, reqId, sb->GetBuffer(), sb->GetSize());
}
struct BWaiter : public CWaiterBase<CScopeUQueue> { BWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
const unsigned char* pBuffer, unsigned int size)
: CWaiterBase<CScopeUQueue>(reqId) {
auto& wc = m_wc;
if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
wc->m_r->Swap(ar.UQueue);
wc->resume();
}, get_aborted(), get_se())) {
ash->raise(reqId);
}
}
};
template<typename ... Ts>
BWaiter wait_sendRequest(unsigned short reqId, const Ts& ... args) {
CScopeUQueue sb;
sb->Save(args ...);
return BWaiter(this, reqId, sb->GetBuffer(), sb->GetSize());
}
Code snippet 3: Decoding RWaiter/wait_send and BWaiter/wait_sendRequest
After having looked at the two awaitable class constructors, you will find that SocketPro always sends a request from client to server with a request id reqId
, an array of bytes pBuffer
having a given length size, and three callbacks (a lambda expression [wc](CAsyncResult & ar) {......}
, get_aborted
and get_se
). The first lambda expression callback is used to monitor a returning result from server. Inside the callback, it is a must to resume a coroutine by calling the method resume (wc->resume()
) after processing server returned result as commented. The second callback get_aborted
is used to monitor the two events, request canceled and socket session closed. The last callback get_se
is used to track an exception error from remote server. In short, the three callbacks cover all communication errors and possible results including both expected result and exception error from server. We will discuss the two callbacks, get_aborted
and get_se
within the coming code snippet 4.
Before ending this section, it is worth noting that all the three callbacks are always called within a socket pool worker thread. You can use the two template methods, wait_send
and wait_sendRequest
to send any types of requests onto remote SocketPro server. They will immediately return C++20 awaitable objects without waiting for server response, which will be co_awaited
later for any types of expected results or possible different types of exceptions.
Decoding Template Class CWaiterBase
The below code snippet 4 shows implementation of SocketPro C++20 awaitable located at the file ../socketpro/include/aclientw.h. First of all, see the code line 78 through 110, which generates the two callbacks DDiscarded
and DServerException
by calling the two methods get_abored
and get_se
, respectively. As shown in the previous code snippet 3, they are also reused within SocketPro remoting file (../socketpro/include/streamingfile.h), server persistent message queue (../socketpro/include/aqhandler.h) and database handlers (../socketpro/include/udb_client.h). It is noted that the generated two callbacks will be always called by socket pool worker threads. Further, C++20 coroutine handler will always be called to resume at end after setting an exception properly, as shown at lines 84 and 106.
typedef std::coroutine_handle<> CRHandle;
template<typename R>
struct CWaiterBase {
struct CWaiterContext {
CWaiterContext(unsigned short reqId)
: m_done(false), m_reqId(reqId) {
}
CWaiterContext(const CWaiterContext& wc) = delete;
CWaiterContext(CWaiterContext&& wc) = delete;
CWaiterContext& operator=(const CWaiterContext& wc) = delete;
CWaiterContext& operator=(CWaiterContext&& wc) = delete;
bool await_ready() noexcept {
CSpinAutoLock al(m_cs);
return m_done;
}
void resume() noexcept {
CSpinAutoLock al(m_cs);
if (!m_done) {
m_done = true;
if (m_rh) {
m_rh.resume(); }
}
}
unsigned short get_id() {
return m_reqId;
}
R m_r;
std::exception_ptr m_ex;
private:
bool await_suspend(CRHandle rh) noexcept {
CSpinAutoLock al(m_cs);
if (!m_done) {
m_rh = rh; return true; }
return false; }
CSpinLock m_cs;
bool m_done; CRHandle m_rh; unsigned short m_reqId;
friend struct CWaiterBase;
};
CWaiterBase(unsigned short reqId)
: m_wc(new CWaiterContext(reqId)) {
}
bool await_suspend(CRHandle rh) noexcept {
return m_wc->await_suspend(rh);
}
R&& await_resume() {
if (m_wc->m_ex) {
std::rethrow_exception(m_wc->m_ex); }
return std::move(m_wc->m_r);
}
bool await_ready() noexcept {
return m_wc->await_ready();
}
protected:
DServerException get_se() noexcept { auto& wc = m_wc;
return [wc](CAsyncServiceHandler* ash, unsigned short reqId, const
wchar_t* errMsg, const char* errWhere, unsigned int errCode) {
wc->m_ex = std::make_exception_ptr(
CServerError(errCode, errMsg, errWhere, reqId));
wc->resume(); };
}
DDiscarded get_aborted() noexcept {
auto& wc = m_wc;
return [wc](CAsyncServiceHandler* h, bool canceled) {
if (canceled) {
wc->m_ex = std::make_exception_ptr(CSocketError(
REQUEST_CANCELED, REQUEST_CANCELED_ERR_MSG, wc->get_id(), false));
} else {
CClientSocket* cs = h->GetSocket();
int ec = cs->GetErrorCode();
if (ec) {
std::string em = cs->GetErrorMsg();
wc->m_ex = std::make_exception_ptr(
CSocketError(ec,Utilities::ToWide(em).c_str(),wc->get_id(),false));
} else {
wc->m_ex =std::make_exception_ptr(CSocketError(SESSION_CLOSED_AFTER,
SESSION_CLOSED_AFTER_ERR_MSG, wc->get_id(), false));
}
}
wc->resume(); };
}
std::shared_ptr<CWaiterContext> m_wc; };
Code snippet 4: Implementation of SocketPro C++20 base awaitable object
A C++20 awaitable class must be implemented with three required methods, await_ready
, await_suspend
and await_resume
which are used by operator co_await
. When co_awaiting an awaitable object, the method await_ready
will be called at first to check if an expecting result is already available. If this method returns true
, it means that result is indeed available now, and co_awaiting will immediately call the method await_resume
for returning result without creating a coroutine handle or calling the method await_suspend
at all. This situation may often happen with SocketPro communication framework, when there are multiple requests streamed as shown with this example. Contrarily, If the method await_ready
returns false
under most of the cases, it means that result is not available at the moment, and co_awaiting
will create a coroutine handle (resume point) and call the method await_suspend
.
The method await_suspend
here returns a bool
value. Further, the method can be also defined to return void
or coroutine handle. If the method returns true
or void
under most of the cases, a coroutine handle is remembered as shown at line 44, and co_awaiting
will return with a resume point. It is expected that a socket pool worker thread will resume from the resume point later by calling the coroutine handle method resume
at line 28. In case the method returns false
, co_awaiting
will immediately resume, and also immediately call the method await_resume
for an expected result or exception. This situation may also happen with SocketPro, although chances are low. The method await_suspend
could be also defined to return a customerized coroutine handle, which is beyond this short article.
At the end, the method await_resume
will be called through operator co_await
after one of the following three cases happen.
await_ready
returns true
await_suspend
returns false
- Call coroutine handler method
resume
as shown at line 28 from a socket pool worker thread
Inside the method await_resume
, it is possible to throw an exception as shown at line 67 if there is indeed an exception recorded. Here, the method returns a rvalue
. It is expected that the template argument R
supports moveable or copy constructor. However, moveable constructor is preferred over copy constructor to avoid memory copy. It is also noted that the method await_resume
can be also defined to return void
if there is no result expected.
Other Examples
After understanding all things above, you can do your own studies about C++20 coroutine feature further by use of the other below samples through the sample SocketPro server all_servers
.
- File exchanging between client and server: ../socketpro/tutorials/cplusplus/remote_file/client/rf_cw.cpp
- Server persistent queue: ../socketpro/tutorials/cplusplus/server_queue/client/sq_cw.cpp
- SQLite database: ../socketpro/stream_sql/usqlite/test_csqlite/cw_sqlite.cpp
C++20 Coroutine/co_await Performance Advantage
C++20 coroutines are stackless without copying stack information data, which can lead to significant performance improvements in addition to their asynchronous computations. SocketPro has come sample client/server codes at the directory ../socketpro/samples/latency for you to do performance testings. It is noted that SocketPro C++ adapter well supports on modern language development features such as anonymous function, Lambda expression, std::promise
, std::future
, coroutine
and co_await
as well as others.
Our performance studies show that C++20 co_await
can always
reduce about 4 ~ 6 microseconds for each of client/server requests in comparison to the std::promise
/std::future
/get
approach no matter whether a client platform is Windows or Linux. The C++20 co_await
performance gain is huge!
History
- 3rd December, 2020: Initial release
- 10th February, 2023: Minor update for adding support to clang 14 or later