Converting a Logging Tool into a full-duplex IPC
In the previous article (Data Logging using IPC in Windows), we presented a basic IPC for data Logging in Windows. This project presented the main tools we are extending here to make it work full-duplex IPC for sending and receiving data between two applications in Windows. In order to keep things simple, we divide the logic in a client and a server, but this will not be a limitation, later we will see why is that.
In that article, we defined a structure with two events - one for read that notifies that a read took place and another one to notify that a write took place- and a memory mapped file for letting all the logging applications to send log data to a listening server. Here, we call this logic a "channel" and based on that, we understand that we need two channels: one for all data that goes out to the other application and another one for all data that the other application will send to this one.
Up to now, we are following the previous article, all we have to do is change things here and there and we are done. Piece of cake. But this is not a logging tool so we have to overcome a number of limitations. One of them is that we were limited to send a text that at most will be slightly less than the size of the memory mapped file. Here, we have to be able to send as much data as we want, no limit. If you are connecting a database provider and a data processing tool, it makes no sense to limit the size of the returned records. In order to achieve that, we will rely on the fact that the sender locks the sending channel, so it will be able to split the package in as many pieces as it wants while remaining sure that it will not be mixed with packages from any other thread and that it will be received in order. Because of that, we don't need to send the package size in the channel, but just send chunks of data and let know which chunk is the beginning of a new package, and when it finishes.
BOOLEAN cs_ipc::send_data(void* buffer, int size)
{
EnterCriticalSection(&m_send_buffer_cs);
int offset = 0, pending_data = size;
uint8_t flags_val = IPC_FLAG_DATA_BEGIN;
PIO_PACKAGE_DATA header = (PIO_PACKAGE_DATA)m_send_buffer;
header->header.application = APPLICATION_DATA;
while (offset < size)
{
header->header.size = (UINT16)min(pending_data +sizeof(IO_PACKAGE_DATA), m_max_data_size);
int data_send = header->header.size - sizeof(IO_PACKAGE_DATA);
if (offset+data_send == size) flags_val |= IPC_FLAG_DATA_END;
header->flags = flags_val;
memcpy(m_send_buffer+ sizeof(IO_PACKAGE_DATA), ((char*)buffer)+offset,
header->header.size- sizeof(IO_PACKAGE_DATA));
internal_send_data(m_send_buffer, header->header.size);
flags_val = 0;
offset += data_send;
pending_data -= data_send;
}
LeaveCriticalSection(&m_send_buffer_cs);
return true;
}
Application Connection Aware
Another important feature is that we have to be aware if the other process goes down. We make the assumption here that this is the only case the communication can be interrupted, and IPC connection will last for all the time the applications are running. Let's say you are streaming a movie to an application and it is closed in the middle of the streaming, how do you know? To be aware of that, we modified the shared data structure adding the writing process id:
typedef struct {
UINT32 writer_process_id;
UINT32 first_element_offset;
UINT32 next_reading_pointer;
UINT32 next_writing_pointer;
BYTE flags[8];
} MMIO_FILE_HEADER, *PMMIO_FILE_HEADER;
This value is obtained using the function GetCurrentProcessId()
. Now, each process will always know who is the other one. We get a handle to the writing process using OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id)
, and we use it in our multiple wait for objects:
waitOn[0] = log_avail_ev;
waitOn[1] = self->m_terminate;
waitOn[2] = self->m_peer_process_handle;
while (true)
{
int wait_result = WaitForMultipleObjects(3, waitOn, FALSE, INFINITE);
if (wait_result == WAIT_OBJECT_0 + 2) {
...
}
if (wait_result == WAIT_OBJECT_0 + 1) {
self->m_running = false;
break;
}
if (wait_result != WAIT_OBJECT_0) {
self->error_handler(L"waitForData > WaitForMultipleObjects failed - waitResult = %u\n",
wait_result);
goto error_case;
break;
}
standard processing, data is available...
...
}
But when the first application starts, we don't have the other process id, so in that case, we have to wait for only the first two events. This situation appears again in case the other process is stopped, in which case we have to close the other process handle and wait again for the first two events. Finally, we have to notify the application that the other process has stopped.
while (true)
{
int wait_result = WaitForMultipleObjects(NULL == waitOn[2] ? 2 : 3, waitOn, FALSE, INFINITE);
if (wait_result == WAIT_OBJECT_0 + 2) {
waitOn[2] = NULL;
CloseHandle(self->m_peer_process_handle);
self->m_status = IPC_STATUS_PEER_DISCONNECTED;
if (NULL != self->m_status_change_handler)
self->m_status_change_handler
(IPC_STATUS_PEER_DISCONNECTED, self->m_status_change_handler_parameter);
continue;
}
...
On the other hand, when a process connects, we have to read its process id, create a handle to this process and start waiting on that as well. And the way we get aware that the process is connected is using a new kind of application data. If you are lost by now with what I mean with application data kind, go back to the previous article where I presented the way data is transferred in this IPC logic. The new kind of application data is APPLICATION_PEER_CONNECT
, and when we receive it, we have to notify the application that the other process is connected.
case APPLICATION_PEER_CONNECT:
{
PIO_HEADER msg_header = (PIO_HEADER)((char*)memory + data_header->next_reading_pointer);
self->m_peer_process_handle = OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id);
if (NULL == self->m_peer_process_handle)
self->error_handler(L"Unexpected error trying to open process id: %u - lastError: 0x%X",
data_header->writer_process_id, GetLastError());
else {
waitOn[2] = self->m_peer_process_handle;
self->m_status = IPC_STATUS_PEER_CONNECTED;
if (NULL != self->m_status_change_handler)
self->m_status_change_handler(IPC_STATUS_PEER_CONNECTED,
self->m_status_change_handler_parameter);
}
data_header->next_reading_pointer += msg_header->size;
break;
}
We are almost done by now. We start the server and it realizes when the client connects and notifies it. It also knows when the client is stopped, so it can react to that situation.
The client will be aware as well when the server is stopped. And when the client starts, it knows if the server is running because otherwise it will be unable to open the memory mapped file or the events.
Server Start|------> Client Connected --------> Client Disconnected ----------> Server Stop|
| |
| |
| \----> Notify Client Disconnection
|
\---> Server Notify Client Connection
Client Start & Server is detected -------------> Server Disconnect ---------> Client Stop|
| |
| \----> Notify Server Disconnect
|
\-> Send APPLICATION_PEER_CONNECT
If the server was running and the client started and then the server stops and is started again, it will know that the client is still running because when it will try to create the events, GetLastError()
will return ERROR_ALREADY_EXISTS
. The objects already created by the server in the previous instance will stay there because the client is connected to them. Then the server sends APPLICATION_PEER_CONNECT
package.
Server Start|--> Client Connected --> Server Stop ----> Server Start ----> Server Stop|
| |
| |
| \----> Send APPLICATION_PEER_CONNECT
|
\---> Server Notify Client Connection
And the most striking case is when the client starts and the server is not running at all. It cannot connect to the objects and m_running
stays false
. The only option we have here is to create a special event m_client_wait_for_server
and a special thread that will wait for it to be triggered.
DWORD WINAPI cs_ipc::waitForServer(LPVOID data)
{
cs_ipc * self = (cs_ipc *)data;
HANDLE waitOn[3];
waitOn[0] = self->m_terminate;
waitOn[1] = self->m_client_wait_for_server;
int wait_res = WaitForMultipleObjects(2, waitOn, FALSE, INFINITE);
if (wait_res == WAIT_OBJECT_0) return 0;
else
if (wait_res == WAIT_OBJECT_0 + 1)
{
self->initialize();
if (self->m_running && (self->m_status == IPC_STATUS_PEER_CONNECTED))
{
if (NULL != self->m_status_change_handler)
self->m_status_change_handler(self->m_status, self->m_status_change_handler_parameter);
self->writePeerConnected();
}
if (!self->m_running)
return 0;
}
else
{
self->error_handler(L"waitForServer > WaitForMultipleObjects failed - waitResult = %u\n",
wait_res);
}
return 0;
}
If the server starts and, while creating the events, GetLastError()
doesn't return ERROR_ALREADY_EXISTS
, it has to try to open LOG_NAME_CLIENT_WAIT_EV
event to test wherever a client is running waiting for a server, so it kicks off client connection and it starts running. Class constructor now notifies the connection or waits for the other process based on initialization result.
initialize();
if (!m_running && !m_is_server) {
wchar_t element_name[200];
if (NULL == (m_client_wait_for_server =
CreateEvent(NULL, true, false, ipc_encode_string
(element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
if (NULL == m_terminate)
if (NULL == (m_terminate = CreateEvent(NULL, TRUE, FALSE, NULL))) goto failed;
if (0 == m_client_wait_for_server_thread)
if (0 == (m_client_wait_for_server_thread = CreateThread
(NULL, 0, cs_ipc::waitForServer, this, 0, NULL))) goto failed;
} else
if (m_running && !m_is_server)
{
if (m_status == IPC_STATUS_PEER_CONNECTED)
writePeerConnected();
}
else
if (m_running && m_is_server)
{
if (m_status == IPC_STATUS_PEER_CONNECTED) {
writePeerConnected();
}
else
{
wchar_t element_name[200];
HANDLE client_wait;
if (NULL == (client_wait = OpenEvent(EVENT_MODIFY_STATE, false,
ipc_encode_string(element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
SetEvent(client_wait);
CloseHandle(client_wait);
}
}
As you can see, using the code of the previous article, we were able to create an IPC logic to connect two processes for sending and receiving data, and being aware of the connection status to the other process. All this is implemented using just three callbacks (data available, connection status change and error handler) and the single send_data
method. Quite simple, yet very sophisticated at the same time. Hope you enjoy it and can use it in some projects you have.