Introduction
As part of integration between systems and legacy systems, you may have to deal with different platforms. The point of business integration is to connect different computer systems, diverse geographical locations, and dissimilar IT infrastructures so that a seamless operation can be run. A spread wide platform is IBM MQ series. IBM MQ Series supplies communication between applications, or between users and a set of applications on dissimilar systems. It has grown in popularity as applications are made available over the Internet because of its support of over 35 platforms and its ability to integrate disparate automation systems. For more details, check the IBM website: http://www-01.ibm.com/software/integration/wmq/.
Background
To start with this article, you have to be familiar with asynchronous communication, messaging platform basics, the Microsoft .NET framework, and Microsoft C#.NET.
Using the code
This IBM MQ series example works as follows: we have two main components, MQAdapter
and Utilities
. MQAdapter
contains all the classes responsible for defining MQ Series Queue managers, channels properties, opening and closing connections, and executing Push and Pop functionalities. Also, we have our custom exception handling responsible for wrapping the MQ errors into readable errors for logging and auditing.
MQAdapter
class: Contains a constructor to instantiate a new instance from the MQ adapter which takes multiple parameters.
public MQAdapter(string mqManager,string channel, string ipAddress,string putQueue,
string getQueue,int timeout, int charSet, int port)
{
try
{
MQEnvironment.Hostname = ipAddress;
MQEnvironment.Channel = channel;
MQEnvironment.Port = 1000;
mqQueueManagerName = mqManager;
mqRequestQueueName = putQueue;
mqResponseQueueName = getQueue;
characterSet = charSet;
pollingTimeout = timeout;
mqQueueManager = new MQQueueManager(mqManager,channel, ipAddress);
mqPutQueue = mqQueueManager.AccessQueue(putQueue, MQC.MQOO_INQUIRE |
MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING|MQC.MQOO_SET_IDENTITY_CONTEXT);
mqGetQueue = mqQueueManager.AccessQueue(getQueue,MQC.MQOO_INPUT_AS_Q_DEF +
MQC.MQOO_FAIL_IF_QUIESCING);
}
catch (MQException mqe)
{
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
Push MQ Message: This function is responsible for connecting to the MQ Server and pushing messages into the supplied put queue.
public string PushMQRequestMessage(string message)
{
try
{
MQMessage requestMessage = new MQMessage();
requestMessage.Persistence = 0;
requestMessage.ReplyToQueueName = mqResponseQueueName;
requestMessage.ReplyToQueueManagerName = mqQueueManagerName;
requestMessage.Format = MQC.MQFMT_STRING;
requestMessage.CharacterSet = characterSet;
requestMessage.MessageType = MQC.MQMT_REQUEST;
requestMessage.MessageId = HexaDecimalUtility.ConvertToBinary(GenerateMQMsgId());
requestMessage.CorrelationId = requestMessage.MessageId;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.Options = MQC.MQPMO_SET_IDENTITY_CONTEXT;
requestMessage.WriteString(message);
mqPutQueue.Put(requestMessage, pmo);
string _msgId = BinaryUtility.ConvertToHexaDecimal(requestMessage.MessageId);
return _msgId;
}
catch (MQException mqe)
{
if(mqPutQueue.OpenStatus)
mqPutQueue.Close();
if(mqQueueManager.OpenStatus)
mqQueueManager.Close();
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
POP MQ Message: This function is responsible for connecting to the MQ Server and to pop a MQ Message (FIFO).
public string GetMQResponseMessage(string correlationId)
{
MQMessage rsMsg = new MQMessage();
rsMsg.CorrelationId = HexaDecimalUtility.ConvertToBinary(correlationId);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.MatchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.WaitInterval = pollingTimeout;
try
{
mqGetQueue.Get(rsMsg,gmo);
return rsMsg.ReadString(rsMsg.DataLength);
}
catch(MQException mqe)
{
if(mqGetQueue.OpenStatus)
mqGetQueue.Close();
if(mqQueueManager.OpenStatus)
mqQueueManager.Close();
if(MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason) ==
"MQRC_NO_MSG_AVAILABLE")
throw new MQAdapterTimeoutException("Message with correlation Id " +
correlationId + " Timed out");
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
To correlate the request and response, and since we are using an asynchronous communication approach, you have to provide a unique key so the MQ Series can correlate the request with the response. (MQ Series supports correlation). To apply synchronous support, a SendMQRequestSync
function was implemented to wait on the reply queue for a specific time to get the response.
To use the component, do as follows:
MQAdapter adapter = new MQAdapter("MqmanagerName",
"RequestChannelName","Queue.RequestName",
"Queue.ResponseName",timeout,characterset, port);
adapater.SendMQRequestSync(strMessage);
The execution flow will be:
- Open a connection to the MQ Server.
- Push the MQ request into the request queue.
- Pop the MQ response from the response queue.
- Wait for the timeout to expire and throw an exception.
P.S.: All MQ series exceptions will be wrapped.
History
Version 1.0.