Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Microsoft Message Queuing – Log Trade Information using Microsoft SQL Server

0.00/5 (No votes)
15 Mar 2011 1  
This article shows how to create a simple trade logging server using Microsoft Message Queuing and Microsoft SQL Server

Introduction

My previous article, Microsoft Message Queuing – A simple multithreaded client and server, shows how to easily create a simple Microsoft Message Queue based solution.

My motivation for writing that article was to show how easy it can be done in response to a question posted over in the CodeProject Q&A section. I’ll admit that I was a bit surprised when it turned out that the poster still couldn’t get his solution to work.

Turns out that he was working on a solution for logging trades to a database, so I guess I’ll have to take another shot at describing how easily it actually can be done.

Obviously, this isn’t a fully-fledged trading system, but it’s still a take on the principles of how one can process market data using Microsoft Message Queuing and Microsoft SQL Server.

Screenshot of the server

Server2.png

Database

I’d like our little system to be able to deal with a reasonable subset, for this purpose, of data often required for a trading system – so we’re only going to log bids, offers, and actual trades.

So here is our table for storing bids:

CREATE TABLE Bid
(
  /* Each bid is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the bid posted */
  TimeStamp datetime not null,
  /* Who posted the bid */
  BidderId uniqueidentifier not null,
  /* What are we bidding for */
  InstrumentId uniqueidentifier not null,
  /* Under what condition will the bidder accept an offer  */
  /* Usually there is a limited set of standard conditions */
  BidConditionsId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

Our table for storing offers:

CREATE TABLE Offer
(
  /* Each offer is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the offer posted */
  TimeStamp datetime not null,
  /* Who posted the offer */
  SellerId uniqueidentifier not null,
  /* Under what condition will the seller accept a bid */
  /* Usually there is a limited set of standard conditions */
  SaleConditionsId uniqueidentifier not null,
  /* What's being offered */
  InstrumentId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

And finally our trade table:

CREATE TABLE Trade
(
  /* Each trade is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the trade done */
  TimeStamp datetime not null,
  /* Who bought */
  SellerId uniqueidentifier not null,
  /* Who sold */
  BuyerId uniqueidentifier not null,
  /* Under what agreement was the trade made */
  TradeAgreementId uniqueidentifier not null,
  /* The instrument, goods, ... */
  InstrumentId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

The tables can be created using the SQL\CreateDataTables.sql script from the Harlinn.Messaging.Server2 project - it's included with the solution source code.

Inserting Data

The following little query is one I find useful when writing code to perform operations against the database – this time it lists the column names of the Offer table.

select cols.name from sys.all_columns cols
join sys.tables t ON (t.object_id = cols.object_id)
where t.name = 'Offer'

As a certain level of performance is usually required for this kind of solution, we will work directly with SqlConnection and SqlCommand – and the script above keeps me from making too many blunders while creating the code.

public class Offer
{
 public const string INSERT_STATEMENT = 
 "INSERT INTO OFFER(Id,TimeStamp,SellerId,SaleConditionsId,InstrumentId,Value,Volume)"+
 " VALUES(@id,@timeStamp,@sellerId,@saleConditionsId,@instrumentId,@value,@volume)";

 public static void Insert(SqlConnection connection, PayloadOffer offer)
 {
  SqlCommand command = connection.CreateCommand();
  using (command)
  {
   command.CommandText = INSERT_STATEMENT;
   command.Parameters.Add("@id", SqlDbType.UniqueIdentifier).Value = offer.Id;
   command.Parameters.Add("@timeStamp", SqlDbType.DateTime).Value = offer.TimeStamp;
   command.Parameters.Add("@sellerId", SqlDbType.UniqueIdentifier).Value = offer.SellerId;
   command.Parameters.Add("@instrumentId", 
	SqlDbType.UniqueIdentifier).Value = offer.InstrumentId;
   command.Parameters.Add("@saleConditionsId", 
	SqlDbType.UniqueIdentifier).Value = offer.SalesConditionsId;
   command.Parameters.Add("@value", SqlDbType.Float).Value = offer.Value;
   command.Parameters.Add("@volume", SqlDbType.Float).Value = offer.Volume;

   command.ExecuteNonQuery();
  }
 }
}

As you see, working directly with SqlConnection and SqlCommand isn’t all that bad, maybe something to think about next time you’re tempted to take the entity framework for a spin.

As the code is quite similar for the Bid and Trade tables, I will skip going through those.

The Payload

The data we are going to send to the server is based on a simple class hierarchy consisting of PayloadBase, PayloadBid, PayloadOffer and PayloadTrade. PayloadBid, PayloadOffer and PayloadTrade is derived from PayloadBase.

[Serializable]
public abstract class PayloadBase
{
 private Guid id;
 private DateTime timeStamp;

 public PayloadBase()
 { 
 }

 public abstract PayloadType PayloadType
 {
  get;
 }
 // code removed 
}

PayloadType is used to discriminate between PayloadTrade, PayloadBid and PayloadOffer – while we could have used the is operator I tend to feel that using a discriminator makes the code more readable.

Processing the Messages

Just like in the previous article, we will process incoming messages asynchronously, one at a time, using the OnReceiveCompleted method:

private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
 try
 {
  MessageQueue mq = (MessageQueue)source;

  if (mq != null)
  {
   try
   {
    System.Messaging.Message message = null;
    try
    {
     message = mq.EndReceive(asyncResult.AsyncResult);
    }
    catch (Exception ex)
    {
     LogMessage(ex.Message);
    }
    if (message != null)
    {
     PayloadBase payload = message.Body as PayloadBase;
     if (payload != null)
     {
      if (receivedCounter == 0)
      {
       firstMessageReceived = DateTime.Now;
      }
     receivedCounter++;
     if ((receivedCounter % 10000) == 0)
     {
      TimeSpan ts = DateTime.Now - firstMessageReceived;
      string messageText = string.Format
	("Received {0} messages in {1}", receivedCounter, ts);
      LogMessage(messageText);
     }

     try
     {
      switch (payload.PayloadType)
      {
       case PayloadType.Bid:
        {
        PayloadBid bid = (PayloadBid)payload;
        DB.Bid.Insert(sqlConnection, bid);
        }
        break;
       case PayloadType.Offer:
        {
        PayloadOffer offer = (PayloadOffer)payload;
        DB.Offer.Insert(sqlConnection, offer);
        }
        break;
       case PayloadType.Trade:
        {
         PayloadTrade trade = (PayloadTrade)payload;
         DB.Trade.Insert(sqlConnection, trade);
        }
        break;
       }
      }
      catch (Exception e)
      {
       if (isRunning)
       {
        LogMessage(e.Message);
       }
      }
     }
    }
   }
   finally
   {
    if (isRunning)
    {
     mq.BeginReceive();
    }
   }
  }
  return;
 }
 catch (Exception exc)
 {
  if (isRunning)
  {
   LogMessage(exc.Message);
  }
 }
}

If you are not familiar with how binary serialization works, you may be surprised by how simple it is to access the data passed on the queue – as you see, I just cast the retrieved payload to the base class using:

PayloadBase payload = message.Body as PayloadBase; 

If I understood the posting that prompted me to write this article – this wasn’t quite as obvious I initially believed it to be. Anyway – we are now ready to store our data using our simple, but rather efficient DB.<type>.Insert methods – as shown in the switch above.

The Message Generator

client2.png

To take our system for a spin, we need a message generating utility, and it’s nearly identical to the one presented in the previous article, except for the SendMessages method:

private void SendMessages(int count)
{
 Random random = new Random(count);
 string message = string.Format("Sending {0} messages", count);
 LogMessage(message);
 DateTime start = DateTime.Now;
 for (int i = 0; i < count; i++)
 {
  PayloadType payloadType = (PayloadType)(i % 3);
  PayloadBase payload = null;

  switch (payloadType)
  {
   case PayloadType.Bid:
    {
     PayloadBid bid = new PayloadBid();
     bid.Initialize();
     payload = bid;
    }
    break;
   case PayloadType.Offer:
    {
     PayloadOffer offer = new PayloadOffer();
     offer.Initialize();
     payload = offer;
     }
     break;
   case PayloadType.Trade:
    {
     PayloadTrade trade = new PayloadTrade();
     trade.Initialize();
     payload = trade;
    }
    break;
  }

  messageQueue.Send(payload);
 }
 DateTime end = DateTime.Now;
 TimeSpan ts = end - start;
 message = string.Format("{0} messages sent in {1}", count, ts);
 LogMessage(message);
}

We just let the binary formatter do its job – it sends the correct information, even if messageQueue.Send(payload); gets a reference to an object declared as a PayloadBase. As long as the class of the object is declared as [Serializable], the object will be serialized correctly.

Conclusion

With a more comprehensive example at your disposal, I hope I’ve succeeded in illustrating how to create a simple, yet efficient solution based on Microsoft Message Queuing capable of dealing with “real” data loads.

It still has ample room for optimization – the obvious one is to process more than one message at a time, and another is to use multiple paired MessageQueue components and SqlConnections to dequeue messages in parallel.

The main purpose has been to keep the solution simple – while still doing something useful.

Best regards,
Espen Harlinn

History

  • 15th March, 2011: Initial posting

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here