Introduction
My previous article, Microsoft Message Queuing – A simple multithreaded client and server, shows how to easily create a simple Microsoft Message Queue based solution.
My motivation for writing that article was to show how easy it can be done in response to a question posted over in the CodeProject Q&A section. I’ll admit that I was a bit surprised when it turned out that the poster still couldn’t get his solution to work.
Turns out that he was working on a solution for logging trades to a database, so I guess I’ll have to take another shot at describing how easily it actually can be done.
Obviously, this isn’t a fully-fledged trading system, but it’s still a take on the principles of how one can process market data using Microsoft Message Queuing and Microsoft SQL Server.
Screenshot of the server
Database
I’d like our little system to be able to deal with a reasonable subset, for this purpose, of data often required for a trading system – so we’re only going to log bids, offers, and actual trades.
So here is our table for storing bids:
CREATE TABLE Bid
(
Id uniqueidentifier not null primary key,
TimeStamp datetime not null,
BidderId uniqueidentifier not null,
InstrumentId uniqueidentifier not null,
BidConditionsId uniqueidentifier not null,
Value float not null,
Volume float not null
)
go
Our table for storing offers:
CREATE TABLE Offer
(
Id uniqueidentifier not null primary key,
TimeStamp datetime not null,
SellerId uniqueidentifier not null,
SaleConditionsId uniqueidentifier not null,
InstrumentId uniqueidentifier not null,
Value float not null,
Volume float not null
)
go
And finally our trade table:
CREATE TABLE Trade
(
Id uniqueidentifier not null primary key,
TimeStamp datetime not null,
SellerId uniqueidentifier not null,
BuyerId uniqueidentifier not null,
TradeAgreementId uniqueidentifier not null,
InstrumentId uniqueidentifier not null,
Value float not null,
Volume float not null
)
go
The tables can be created using the SQL\CreateDataTables.sql script from the Harlinn.Messaging.Server2
project - it's included with the solution source code.
Inserting Data
The following little query is one I find useful when writing code to perform operations against the database – this time it lists the column names of the Offer
table.
select cols.name from sys.all_columns cols
join sys.tables t ON (t.object_id = cols.object_id)
where t.name = 'Offer'
As a certain level of performance is usually required for this kind of solution, we will work directly with SqlConnection
and SqlCommand
– and the script above keeps me from making too many blunders while creating the code.
public class Offer
{
public const string INSERT_STATEMENT =
"INSERT INTO OFFER(Id,TimeStamp,SellerId,SaleConditionsId,InstrumentId,Value,Volume)"+
" VALUES(@id,@timeStamp,@sellerId,@saleConditionsId,@instrumentId,@value,@volume)";
public static void Insert(SqlConnection connection, PayloadOffer offer)
{
SqlCommand command = connection.CreateCommand();
using (command)
{
command.CommandText = INSERT_STATEMENT;
command.Parameters.Add("@id", SqlDbType.UniqueIdentifier).Value = offer.Id;
command.Parameters.Add("@timeStamp", SqlDbType.DateTime).Value = offer.TimeStamp;
command.Parameters.Add("@sellerId", SqlDbType.UniqueIdentifier).Value = offer.SellerId;
command.Parameters.Add("@instrumentId",
SqlDbType.UniqueIdentifier).Value = offer.InstrumentId;
command.Parameters.Add("@saleConditionsId",
SqlDbType.UniqueIdentifier).Value = offer.SalesConditionsId;
command.Parameters.Add("@value", SqlDbType.Float).Value = offer.Value;
command.Parameters.Add("@volume", SqlDbType.Float).Value = offer.Volume;
command.ExecuteNonQuery();
}
}
}
As you see, working directly with SqlConnection
and SqlCommand
isn’t all that bad, maybe something to think about next time you’re tempted to take the entity framework for a spin.
As the code is quite similar for the Bid and Trade tables, I will skip going through those.
The Payload
The data we are going to send to the server is based on a simple class hierarchy consisting of PayloadBase
, PayloadBid
, PayloadOffer
and PayloadTrade
. PayloadBid
, PayloadOffer
and PayloadTrade
is derived from PayloadBase
.
[Serializable]
public abstract class PayloadBase
{
private Guid id;
private DateTime timeStamp;
public PayloadBase()
{
}
public abstract PayloadType PayloadType
{
get;
}
}
PayloadType
is used to discriminate between PayloadTrade
, PayloadBid
and PayloadOffer
– while we could have used the is operator I tend to feel that using a discriminator makes the code more readable.
Processing the Messages
Just like in the previous article, we will process incoming messages asynchronously, one at a time, using the OnReceiveCompleted
method:
private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;
if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
}
catch (Exception ex)
{
LogMessage(ex.Message);
}
if (message != null)
{
PayloadBase payload = message.Body as PayloadBase;
if (payload != null)
{
if (receivedCounter == 0)
{
firstMessageReceived = DateTime.Now;
}
receivedCounter++;
if ((receivedCounter % 10000) == 0)
{
TimeSpan ts = DateTime.Now - firstMessageReceived;
string messageText = string.Format
("Received {0} messages in {1}", receivedCounter, ts);
LogMessage(messageText);
}
try
{
switch (payload.PayloadType)
{
case PayloadType.Bid:
{
PayloadBid bid = (PayloadBid)payload;
DB.Bid.Insert(sqlConnection, bid);
}
break;
case PayloadType.Offer:
{
PayloadOffer offer = (PayloadOffer)payload;
DB.Offer.Insert(sqlConnection, offer);
}
break;
case PayloadType.Trade:
{
PayloadTrade trade = (PayloadTrade)payload;
DB.Trade.Insert(sqlConnection, trade);
}
break;
}
}
catch (Exception e)
{
if (isRunning)
{
LogMessage(e.Message);
}
}
}
}
}
finally
{
if (isRunning)
{
mq.BeginReceive();
}
}
}
return;
}
catch (Exception exc)
{
if (isRunning)
{
LogMessage(exc.Message);
}
}
}
If you are not familiar with how binary serialization works, you may be surprised by how simple it is to access the data passed on the queue – as you see, I just cast the retrieved payload to the base class using:
PayloadBase payload = message.Body as PayloadBase;
If I understood the posting that prompted me to write this article – this wasn’t quite as obvious I initially believed it to be. Anyway – we are now ready to store our data using our simple, but rather efficient DB.<type>.Insert methods – as shown in the switch above.
The Message Generator
To take our system for a spin, we need a message generating utility, and it’s nearly identical to the one presented in the previous article, except for the SendMessages
method:
private void SendMessages(int count)
{
Random random = new Random(count);
string message = string.Format("Sending {0} messages", count);
LogMessage(message);
DateTime start = DateTime.Now;
for (int i = 0; i < count; i++)
{
PayloadType payloadType = (PayloadType)(i % 3);
PayloadBase payload = null;
switch (payloadType)
{
case PayloadType.Bid:
{
PayloadBid bid = new PayloadBid();
bid.Initialize();
payload = bid;
}
break;
case PayloadType.Offer:
{
PayloadOffer offer = new PayloadOffer();
offer.Initialize();
payload = offer;
}
break;
case PayloadType.Trade:
{
PayloadTrade trade = new PayloadTrade();
trade.Initialize();
payload = trade;
}
break;
}
messageQueue.Send(payload);
}
DateTime end = DateTime.Now;
TimeSpan ts = end - start;
message = string.Format("{0} messages sent in {1}", count, ts);
LogMessage(message);
}
We just let the binary formatter do its job – it sends the correct information, even if messageQueue.Send(payload);
gets a reference to an object declared as a PayloadBase
. As long as the class of the object is declared as [Serializable]
, the object will be serialized correctly.
Conclusion
With a more comprehensive example at your disposal, I hope I’ve succeeded in illustrating how to create a simple, yet efficient solution based on Microsoft Message Queuing capable of dealing with “real” data loads.
It still has ample room for optimization – the obvious one is to process more than one message at a time, and another is to use multiple paired MessageQueue
components and SqlConnections
to dequeue messages in parallel.
The main purpose has been to keep the solution simple – while still doing something useful.
Best regards,
Espen Harlinn
History
- 15th March, 2011: Initial posting