Introduction
I was looking for interprocess communication on Linux using C# which is fast and reliable for single node. Posix message queues are a popular way to pass messages across the processes within a single node.
Background
There are different options of doing IPC on linux like UDP/TCP or Posix/SysV message queues. Posix message queues are best options if it is within the same node from the performance point of view. As I was developing application in C# mono and did not find any good article to do this, I thought of posting the tip to help others. I hope this will help people developing application using C#, Mono.
Using the Code
As getting all the complex C structures and error codes in C# was complex and errorprone, I divided the work in two parts. First, I wrote a wrapper in C for posix message queue and then wrote C# wrapper to call those functions.
C wrapper for posix functions. posixwrapper.c contains the wrapper functions for open, close, remove and send messages from posix message queue. This encapsulates complex data structures like mq_attr
, timespec
, sigevt
and exposes simple datatypes to be passed from C#.
mqd_t pmq_open_with_attr (const char *name, int oflag,int mode, int qSize, int eSize,int *errnum)
{
mqd_t ret;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = qSize;
attr.mq_msgsize = eSize;
attr.mq_curmsgs = 0;
ret = mq_open(name, oflag , mode, attr);
if (ret == -1){
if (errnum != NULL) *errnum = errno;
}
return ret;
}
int pmq_notify (mqd_t mqdes, NotifyHandler notifyHandler, int *errnum)
{
int ret;
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = notifyHandler;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_int = mqdes;
ret = mq_notify(mqdes, &sev);
if (ret == -1){
if (errnum != NULL) *errnum = errno;
}
return ret;
}
int pmq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, int timeout, int *errnum)
{
int ret;
struct timespec tm;
char buf[msg_len];
if (timeout ==0){
ret = mq_receive(mqdes,msg_ptr,msg_len, msg_prio);
} else
{
clock_gettime(CLOCK_REALTIME, &tm);
tm.tv_sec += timeout; ret = mq_timedreceive(mqdes,buf,msg_len, msg_prio,&tm);
}
memcpy(msg_ptr,buf,msg_len);
if (ret == -1){
if (errnum != NULL) *errnum = errno;
}
return ret;
}
After C wrapper, I wrote C# class (PosixMq.cs) to declare extern
function from C. Define Openflags
which are decimal equivalent of octal and open queue function take octal values for flags and mode.
[Flags]
public enum OpenFlags {
O_RDONLY = 0,
O_WRONLY = 1,
O_RDWR = 2,
O_CREAT = 64, O_EXCL = 128, O_NOCTTY = 256, O_TRUNC = 512, O_APPEND = 1024, O_NONBLOCK = 2048, O_SYNC = 1052672,
}
[System.Runtime.InteropServices.StructLayout(LayoutKind.Explicit)]
public struct sigval {
[System.Runtime.InteropServices.FieldOffset(0)]
public int sival_int;
};
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void NotifyHandler(sigval sigVal);
public class PosixMq
{
const string Posixlibpath =
"/home/secure/workspace/posixwrapper/Debug/libposixwrapper.so";
[DllImport(Posixlibpath, EntryPoint =
"pmq_open", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_open(string queueName, OpenFlags flags,ref Int32 err);
[DllImport(Posixlibpath, EntryPoint =
"pmq_open_with_attr", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_open_with_attr
(string queueName, OpenFlags flags, Int32 mode, Int32 qSize, Int32 eSize,ref Int32 err);
[DllImport(Posixlibpath, EntryPoint =
"pmq_send", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_send(int queueHandle,
string Message, int size, uint messagePriorty, Int32 timeOut,ref Int32 err);
[DllImport(Posixlibpath, EntryPoint = "pmq_close",
CharSet = CharSet.Auto, CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_close(int queueHandle,ref Int32 err);
[DllImport(Posixlibpath, EntryPoint =
"pmq_notify", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_notify
(int queueHandle, NotifyHandler notifyHandler,ref Int32 err );
[DllImport(Posixlibpath, EntryPoint =
"pmq_receive", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_receive(int queueHandle,
StringBuilder Message, int size, ref UInt32 messagePriorty,Int32 timeOut,ref Int32 err );
[DllImport(Posixlibpath, EntryPoint =
"pmq_unlink", CallingConvention = CallingConvention.Cdecl)]
public static extern int pmq_unlink(string queueName,ref Int32 err);
}
}
Now you can call these functions from C# easily.
Handle = PosixMq.pmq_open_with_attr("/mqtest",
OpenFlags.O_RDWR | OpenFlags.O_CREAT , 436, 10,8912 ,ref err);
PosixMq.pmq_send (Handle, "Hello from mono " +
DateTime.Now.ToString (), 100, 0,2, ref err);
PosixMq.pmq_notify
(Handle, signotify,ref err)
Important part is to write notification event. In posix, once the notification is raised, you must empty the message queue so that you can receive notification again when new message is added. Also you need to set the notification again. sigNotify
function in MessageNotify
project does this.
static void sigNotify(sigval sigVal)
{
int numRead;
int Handle;
int len;
Int32 err;
StringBuilder buffer;
UInt32 priorty;
Console.WriteLine ("In Notify function");
Handle = sigVal.sival_int;
NotifySetup (Handle);
priorty = 0;
len = 32768;
buffer = new StringBuilder(len);
numRead = 0;
err = 0;
while ( numRead >= 0)
{
numRead = PosixMq.pmq_receive(Handle,
buffer, buffer.Capacity, ref priorty, 0, ref err);
Console.WriteLine("Message is :- " +
buffer + "numread = " + numRead.ToString());
}
}
Please find the attachment which contains a C project (posixwrapper
) C# solution PosixDemo(PosixMono, MessageSend,MessageListner)
.
Points of Interest
Posix message queues are fast and reliable means of IPC on Linux machine.
History
- 3rd October, 2015: Initial version