Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Using Posix Message Queue in C#, Mono on Linux

0.00/5 (No votes)
3 Oct 2015 1  
This tip demonstrates how to use posix message for Inter Process Communication (IPC) in C#, Mono on Linux

Introduction

I was looking for interprocess communication on Linux using C# which is fast and reliable for single node. Posix message queues are a popular way to pass messages across the processes within a single node. 

Background

There are different options of doing IPC on linux like UDP/TCP or Posix/SysV message queues. Posix message queues are best options if it is within the same node from the performance point of view. As  I was developing application in C# mono and did not find any good article to do this, I thought of posting the tip to help others. I hope this will help people developing application using C#, Mono.

Using the Code

As getting all the complex C structures and error codes  in C# was complex and errorprone, I divided the work in two parts. First, I wrote a wrapper in C for posix message queue and then wrote C# wrapper to call those functions. 

C wrapper for posix functions. posixwrapper.c contains the wrapper functions for open, close, remove and send messages from posix message queue. This encapsulates complex data structures like mq_attr, timespec, sigevt and exposes simple datatypes to be passed from C#.

//Wraps mq_attr structure and expose int instead.
mqd_t pmq_open_with_attr (const char *name, int oflag,int mode, int qSize, int eSize,int *errnum)
{
    mqd_t ret;
    struct mq_attr attr;
    //attr = NULL;
    /* initialize the queue attributes */
    attr.mq_flags = 0;
    attr.mq_maxmsg = qSize;
    attr.mq_msgsize = eSize;
    attr.mq_curmsgs = 0;
    ret = mq_open(name, oflag , mode, attr);
    if (ret == -1){
        if (errnum != NULL)    *errnum = errno;
    }
    return ret;
}

// Expose sigval instead of comples sigevent structure
 int pmq_notify (mqd_t mqdes, NotifyHandler notifyHandler, int *errnum)
{
    int ret;
    struct sigevent sev;
    // Notify via thread
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = notifyHandler;
    //Could be pointer to pthread_attr_t structure
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_int = mqdes;
    ret = mq_notify(mqdes, &sev);
    if (ret == -1){
        if (errnum != NULL) *errnum = errno;
    }
    return ret;
}

//Expose timeout instead of timespec structure.
 int pmq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len,
           unsigned int *msg_prio, int timeout, int *errnum)
{
    int ret;
    struct timespec tm;
    char buf[msg_len];

    if (timeout ==0){
     ret = mq_receive(mqdes,msg_ptr,msg_len, msg_prio);
    } else
    {
        clock_gettime(CLOCK_REALTIME, &tm);
        tm.tv_sec += timeout;  // Set for 20 seconds
        ret = mq_timedreceive(mqdes,buf,msg_len, msg_prio,&tm);
    }

    memcpy(msg_ptr,buf,msg_len);
    //printf("pmq_receive -  %s\n",msg_ptr);

    if (ret == -1){
        if (errnum != NULL) *errnum = errno;
    }
    return ret;
}

After C wrapper, I wrote C# class (PosixMq.cs) to declare extern function from C. Define Openflags which are decimal equivalent of octal and open queue function take octal values for flags and mode.

 	[Flags]
	public enum OpenFlags {
		//
		// One of these
		//
		O_RDONLY    = 0,
		O_WRONLY    = 1,
		O_RDWR      = 2,

		//
		// Or-ed with zero or more of these
		//
		O_CREAT     = 64, //Octal - 0100
		O_EXCL      = 128, // Octal - 0200
		O_NOCTTY    = 256, // Octal - 0400
		O_TRUNC     = 512, //Octal - 01000 
		O_APPEND    = 1024, //Octal -02000
		O_NONBLOCK  = 2048, // Octal -04000
		O_SYNC      = 1052672, //Octal -04010000

		//
		// These are non-Posix, think of a way of exposing
		// this for Linux users.
		//

		// O_NOFOLLOW  = 512,
		// O_DIRECTORY = 1024,
		// O_DIRECT    = 2048,
		// O_ASYNC     = 4096,
		// O_LARGEFILE = 8192
	}
	[System.Runtime.InteropServices.StructLayout(LayoutKind.Explicit)]
	public struct sigval {          /* Data passed with notification */
		[System.Runtime.InteropServices.FieldOffset(0)]
		public int sival_int;         /* Integer value */
		//[System.Runtime.InteropServices.FieldOffset(0)]
		//public IntPtr? sival_ptr;         /* Pointer value */
	};

	[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
	public delegate void NotifyHandler(sigval sigVal);
	
	public class PosixMq
	{
		const string Posixlibpath = 
		"/home/secure/workspace/posixwrapper/Debug/libposixwrapper.so";
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_open", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_open(string queueName, OpenFlags flags,ref Int32 err);
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_open_with_attr", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_open_with_attr
		(string queueName, OpenFlags flags, Int32 mode, Int32 qSize, Int32 eSize,ref Int32 err);
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_send", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_send(int queueHandle, 
		string Message, int size, uint messagePriorty, Int32 timeOut,ref Int32 err);
		[DllImport(Posixlibpath, EntryPoint = "pmq_close", 
		CharSet = CharSet.Auto, CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_close(int queueHandle,ref Int32 err);
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_notify", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_notify
		(int queueHandle, NotifyHandler notifyHandler,ref Int32 err );
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_receive", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_receive(int queueHandle, 
		StringBuilder Message, int size, ref UInt32 messagePriorty,Int32 timeOut,ref Int32 err );
		[DllImport(Posixlibpath, EntryPoint = 
		"pmq_unlink", CallingConvention = CallingConvention.Cdecl)]
		public static extern int pmq_unlink(string queueName,ref Int32 err);
	}
}

Now you can call these functions from C# easily.

//To create message queue
Handle = PosixMq.pmq_open_with_attr("/mqtest",
OpenFlags.O_RDWR | OpenFlags.O_CREAT , 436, 10,8912 ,ref err);

//To Sendmessage
PosixMq.pmq_send (Handle, "Hello  from mono "  + 
DateTime.Now.ToString (), 100, 0,2, ref err);
//To setup notification
PosixMq.pmq_notify 
(Handle, signotify,ref err)

Important part is to write notification event. In posix, once the notification is raised, you must empty the message queue so that you can receive notification again when new message is added. Also you need to set the notification again. sigNotify function in MessageNotify project does this. 

static void sigNotify(sigval sigVal)
		{
			int numRead;
			int Handle;
			int len;
			Int32 err;
			StringBuilder buffer;
			UInt32 priorty;
			Console.WriteLine ("In Notify function");
			Handle = sigVal.sival_int;
			NotifySetup (Handle);
			priorty = 0;
			len = 32768;
			buffer = new StringBuilder(len);
			numRead = 0;
			err = 0;
			while ( numRead >= 0)
			{
				numRead = PosixMq.pmq_receive(Handle,   
				buffer, buffer.Capacity, ref priorty, 0, ref err);
				Console.WriteLine("Message is :- " + 
				buffer + "numread = " + numRead.ToString());
			}
		}

Please find the attachment which contains a C project (posixwrapper) C# solution PosixDemo(PosixMono, MessageSend,MessageListner).

Points of Interest

Posix message queues are fast and reliable means of IPC on Linux machine.

History

  • 3rd October, 2015: Initial version

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here