Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

XQueue - a non-MFC C++ class to manage an interprocess queue

0.00/5 (No votes)
17 Jan 2005 1  
XQueue implements a shared-memory first-in first-out (FIFO) queue, based on memory-mapped files, that works on all versions of Windows.

Introduction

CXQueue is a class to create and manage an interprocess queue. CXQueue is intended to offload operations from client app to server app - for example, performing logging actions; or server might function as network gateway, so that details would be transparent to the client.

CXQueue supports FIFO (first-in first-out) communication between multiple writers (clients) and one reader (server) that are running on same machine. Communication is one-way only - from client to server. CXQueue offers no delivery guarantee.

CXQueue may be used in either MFC or SDK programs. It has been tested on Windows 98, Windows 2000, and Windows XP.

Background

Although Windows NT-based platforms provide mechanisms such as mailslots and named pipes that can be used to emulate a queue, these are not available on Win9x systems. There have been some superficial implementations of queues using shared memory and other techniques, but these have been limited in one of two ways:

  1. They are limited to only intraprocess, not interprocess communication;

    or

  2. They are synchronous, meaning that queue is limited to one entry, and server must remove and process entry before client can add another.

An Alternative

If you are reading this, you are probably also familiar with Microsoft queue solution: MSMQ. This is an extremely powerful tool that is mandatory in some situations. However, it is also very complex, non-trivial to install and set up, and has some issues on Win9x systems.

For information concerning issues with MSMQ, please refer to Usenet news groups:

microsoft.public.msmq.deployment
microsoft.public.msmq.networking
microsoft.public.msmq.programming
microsoft.public.msmq.setup

Concept & Facilities

The CXQueue class incorporates both client and server API set. The server calls CXQueue::Create(), while client(s) call CXQueue::Open().

CXQueue is based on shared memory using memory-mapped file. Access to shared memory is controlled by means of two kernel objects: an event (used by clients to ensure quiescence, to allow server to free some space); and a mutex, to control actual writing to queue.

The shared memory (hereafter called MMF, for memory-mapped file), is composed of fixed-size blocks. The fixed size ensures that there will be no fragmentation, and eliminates need for garbage collection. The first block of MMF is used for CXQueue variables. The second block and all further blocks are used for data.

The first block (the variable block) holds data that is used to manage and monitor queue. The data blocks are managed by the use of two doubly-linked lists: free list and used list. Initially, all blocks are assigned to free list. When client writes to queue, one or more blocks are allocated from free list and transferred to used list. This involves simply relinking block numbers; no physical block copying is done, so it is very fast.

The client may write any number of bytes to queue (up to total size of data blocks). The default queue block size is 1024 bytes, but this may be easily changed by modifying one line of code. Typically client→server message is much less than 1024 bytes - usually, it is between 10 and 200 bytes. To assist in determining optimal block size, CXQueue monitors and stores maximum size of client write in queue variable block. You can then use this information to adjust size of queue blocks in your own application.

Regardless of block size chosen, it should be expected that there will be client writes that exceed block size. When this happens, CXQueue determines how many blocks will be needed, and writes data to blocks, splitting data as necessary. (Note: data blocks have header with back and forward links and other information, so there is less than 1024 bytes available in each block for data). If multiple blocks are necessary, continuation flag is set in block header, to indicate that there is another block (which can be found by means of forward link).

The links that have been mentioned are block numbers. Data blocks are numbered from 0 to N-1, although first data block is actually second block in MMF. To calculate byte address for any block, formula is:

block_address = (block_no * BLOCK_SIZE) + base_address

where

block_no is zero-based number of data block,
BLOCK_SIZE is fixed number of bytes in a block,
and base_address is byte address of first data block,
which is simply address returned by MapViewOfFile(), plus
BLOCK_SIZE (to account for variable block).

Note that there is no guarantee that blocks will be contiguous in MMF for multi-block client write. But they will always be delivered to server in correct order.

When client has added entry to queue, notification event informs server that queue needs to be serviced. The server then performs read on queue (usually, two reads, with first read returning only size of queue entry). Then server reads data and returns blocks to free list. Because only server manipulates blocks already in used list, it is not necessary to lock queue (using mutex) until server actually frees blocks. This optimization helps to reduce time that clients are prevented from writing.

As mentioned in previous paragraph, mutex is used to control write access to queue. An event object (mentioned earlier) is also used to synchronize queue access. This event object is used only by clients. Here is why: when client wants to write, first thing it must do is determine if there are enough free blocks to accommodate entire write. If there are, it can then write. But if there are not, it would do no good to use mutex at this point, because using mutex would lock out server as well as all clients, so server would not be able to free any blocks. The client would then be waiting for server to free some blocks, and server would be waiting for client to release mutex.

What event object does is to prevent clients from starting any new writes. Since no one will be adding anything to queue, server will have chance to free some blocks. When enough blocks become free, waiting client can complete write, and then set event to allow other clients to write.

To ensure that messages are processed in first-in first-out manner, newly-written blocks are always appended to end of used list. When server processes queue entries, it always removes entries beginning at front of used list. Server apps may verify proper ordering of message entries by inspecting sequential message id that is stored in each queue entry; multi-block entries will have same message id stored in each block.

In current implementation, CXQueue has been tested with multiple clients, but only one server.

CXQueue API

Here are functions provided by CXQueue:

  • CXQueue() - Constructs CXQueue object.
    ///////////////////////////////////////////////////////////////////////
    //
    // CXQueue()
    //
    // Purpose:     Construct CXQueue object.
    //
    // Parameters:  None
    //
    // Returns:     None
    //
  • ~CXQueue() - Destroys CXQueue object.
    ///////////////////////////////////////////////////////////////////////
    //
    // ~CXQueue()
    //
    // Purpose:     Destroy CXQueue object.
    //
    // Parameters:  None
    //
    // Returns:     None
    //
  • Close() - Closes XQueue object and cleans up.
    ////////////////////////////////////////////////////////////////////////
    //
    // Close()
    //
    // Purpose:     Close XQueue object and clean up.
    //
    // Parameters:  None
    //
    // Returns:     LONG - XQueue error code (see XQueue.h)
    //
  • Create() - Create() is called by XQueue server to create system-wide named XQueue object.
    ////////////////////////////////////////////////////////////////////////
    //
    // Create()
    //
    // Purpose:     Create() is called by the XQueue server to create the
    //              system-wide named XQueue object.
    //
    // Parameters:  lpszQueueName - [in] name of XQueue object 
    //                                      (must be specified)
    //              nQueueSize    - [in] size of queue in blocks
    //              nBlockSize    - [in] size of a block in bytes
    //
    // Returns:     LONG          - XQueue error code (see XQueue.h)
    //
  • Dump() - Dumps (TRACES) info about XQueue object.
    ////////////////////////////////////////////////////////////////////////
    //
    // Dump()
    //
    // Purpose:     Dump (TRACE) info about the XQueue object.
    //
    // Parameters:  None
    //
    // Returns:     None
    //
  • Open() - Open() is called by XQueue clients to open an existing XQueue object.
    /////////////////////////////////////////////////////////////////////////
    //
    // Open()
    //
    // Purpose:     Open() is called by XQueue clients to open an existing 
    //              XQueue object.
    //
    // Parameters:  lpszQueueName - [in] name of XQueue object (must be 
    //                                                 specified)
    // Returns:     LONG          - XQueue error code (see XQueue.h)
    //
  • Read() - Reads message from queue. Typically used only by XQueue servers.
    //////////////////////////////////////////////////////////////////////////
    //
    // Read()
    //
    // Purpose:     Read message from the queue.  Typically used only by XQueue
    //              servers.
    //
    // Parameters:  lpData       - [out] Pointer to the buffer that receives the
    //                             data read from the file
    //              lpcbSize     - [in/out] size in bytes of lpData;  if lpData
    //                             is NULL, the size of the required buffer will
    //                             returned in lpcbSize.
    //              lpnMessageId - [out] the message id contained in the 
    //                                                    queue block
    //              pdwThreadId  - [out] the thread id of the thread that queued
    //                             the message
    //              dwTimeOut    - [in] specifies the time-out interval, in
    //                             milliseconds
    //
    // Returns:     LONG         - XQueue error code (see XQueue.h)
    //
  • Write() - Writes string to queue. Typically used only by XQueue clients.
    ////////////////////////////////////////////////////////////////////////
    //
    // Write()
    //
    // Purpose:     Write string to the queue.  Typically used only by XQueue
    //              clients.
    //
    // Parameters:  lpszString - [in] Pointer to the buffer that contains the
    //                           nul-terminated string to queue
    //              dwTimeOut  - [in] specifies the time-out interval, in
    //                           milliseconds
    //
    // Returns:     LONG       - XQueue error code (see XQueue.h)
    //
  • Write() - Writes byte data to queue. Typically used only by XQueue clients.
    /////////////////////////////////////////////////////////////////////////
    //
    // Write()
    //
    // Purpose:     Write byte data to the queue.  Typically used only by 
    //              XQueue clients.
    //
    // Parameters:  lpData    - [in] Pointer to the buffer that contains the
    //                          data to queue
    //              nDataSize - [in] number of bytes of data in lpData
    //              dwTimeOut - [in] specifies the time-out interval, in
    //                          milliseconds
    //
    // Returns:     LONG      - XQueue error code (see XQueue.h)
    //

How To Use

To integrate CXQueue class into your app, you first need to add following files to your project:

  • XQueue.cpp
  • XQueue.h
  • XMemmap.cpp
  • XMemmap.h

For details on how to use CXQueue object, refer to code in XQueueClientTestDlg.cpp, XQueueServerTestDlg.cpp, and ServerThread.cpp.

Known Limitations

  1. XQueue offers no delivery guarantee.
  2. XQueue does not guarantee that there will not be duplicates.
  3. XQueue clients and server must be on same machine.

Demo App

XQueueClientTest.exe and XQueueServerTest.exe demonstrate use of CXQueue in client/server scenario. When you start client, it will first try to open XQueue. If this fails, client will try to start server, as you see in messages that are logged:

After you select number of messages and message size, client queues messages, and calculates throughput:

You can start multiple clients and server will update its stats to display number of clients connected:

The server will dynamically display MMF loading as messages are queued:

Frequently Asked Questions

  1. Why use XQueue at all? Why not just use Microsoft's MSMQ?

    There are two issues that you have to deal with if your are considering MSMQ. First, MSMQ is normally not installed on Win98 systems. To install MSMQ 1.0 on Win98 systems, you will need NT4 Option Pack. Also, you must be running either MSMQ 1.0 or MSMQ 2.0 on an NT server to use an MSMQ 1.0 independent client on Win98 (MSMQ 2.0 is not available for Win98). Note that MSMQ is not available on XP Home edition, only on XP Pro.

    Second, MSMQ is complex: 30+ APIs, 100+ properties, 10+ major structs. Plus, installation of MSMQ is non-trivial - you can't just dump some DLLs in SYSTEM directory. Obviously, MSMQ offers some significant features, and if you need those, then MSMQ is for you. Unlike MSMQ, XQueue (client and server) runs on Win98 and XP Home.

  2. Why use XQueue at all? Why not just use named pipes?

    First, unlike named pipes, XQueue (client and server) runs on Win98. Second, XQueue is much easier to integrate into your app than named pipes, and requires no installation procedure.

  3. I don't want to run my app under debugger all the time, just to get TRACE output. How can I see all this error reporting?

    You can use excellent free utility DebugView from Sysinternals. This allows you to see all TRACE output from your debug builds. One very nice feature of DebugView that I cannot live without is ability to filter the output, and colorize any line that contains a particular string.

  4. Can I use XQueue in non-MFC apps?

    Yes. It has been implemented to compile with any Win32 program.

  5. When I try to include XQueue.cpp in my MFC project, I get compiler error,
    XQueue.cpp(1713) : fatal error C1010: unexpected end of file while looking for precompiled header directive.
    How can I fix this?

    When using XQueue in project that uses precompiled headers, you must change C/C++ Precompiled Headers settings to Not using precompiled headers for XQueue.cpp and XMemmap.cpp. Be sure to do this for All Configurations.

  6. When I try to build demo app, I get linker error
    LINK : fatal error LNK1104: cannot open file "mfc42u.lib" Error executing link.exe.
    How can I fix this?

    The default installation options of Visual C++ V6.0 do not install Unicode libraries of MFC, so you might get an error that mfc42u.lib or mfc42ud.lib cannot be found. You can fix this either by installing Unicode libs from VC++ install CD, or by going to Build | Set Active Configuration and selecting one of non-Unicode configurations.

  7. Can we use XQueue in our (shareware/commercial) app?

    Yes, you can use XQueue without charge or license fee. It would be nice to acknowledge my Copyright in your About box or splash screen, but this is up to you.

Future Work

  • Run pair of XQueue queues for two-way communication.
  • Allow multiple servers.
  • Persist queue by creating MMF backed by file instead of swap file.
  • Provide encryption for messages.
  • Provide optional message compression.
  • Validate message content using CRC.

Acknowledgments

Revision History

Version 1.2 - 2005 January 17

  • Initial public release

Usage

This software is released into the public domain. You are free to use it in any way you like, except that you may not sell this source code. If you modify it or extend it, please to consider posting new code here for everyone to share. This software is provided "as is" with no expressed or implied warranty. I accept no liability for any damage or loss of business that this software may cause.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here