Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

.NET / Scala Interop Using RabbitMQ

0.00/5 (No votes)
8 Oct 2015 2  
Getting .NET code to talk to Scala code and vice versa using RabbitMQ

Introduction

I have recently started a new job, where we are using a mixture of technologies, lots of .NET, and the new stuff will be Scala based. We are using Scala so that we can use the rich set of tools available for the JVM, such as Apache Spark/Cassandra/ZooKeeper. We do however have a lot of .NET that exists in a whole bunch of apps that we may need to communicate with.

We are pretty sure that the new code will be based loosely around MicroServices and will use some sort of Actor framework (maybe Akka and Akka.NET). At the time of writing though Akka and Akka. NET do not talk to each other, there is no "On the wire" protocol that has been written to support this. I did read a very interesting post (link has been lost sorry) from one of the main authors of Akka who stated this is something that they wanted to do, but was a  VERY large bit of work, and it would not be happening soon.

So if you want to have some Akka code talk to Akka .NET code you are on your own. So lets think about that for a minute, lets actually remove the need to use an Actor framework from the picture all together. What are we trying to get to happen. Essentially we want to to find a way for some "event/message" happening in a Scala bit of code to trigger something in some .NET code.

There are some cool libraries to allow Java classes to be used from .NET such as IKVM, and there is also a nice project jnbridge which claims "Bridge between anything Java and anything .NET, anywhere" on its main web page.

I have seen IKVM used, and although it is a nice library, using java classes from .NET is not what our use case needs. As for jnbridge I have not used this, but there was certainly one area which looked cool which was to be able to use the Java Message Service to .NET, which you can read about here : http://jnbridge.com/software/jms-adapter-for-net/overview

That looked like it may have worked, but I wanted something a little lighter and something I was more familiar with. I have used a number of different messaging technologies over the years, but one of them is always pretty easy to get up and running which is RabbitMQ

RabbitMQ has a whole bunch of clients, including a Java one and a .NET one. Our use case is Scala to .NET, but since Scala is a JVM targetted language we can make use of the Java RabbitMQ client. 

So in essence this article is about how to get Scala code to talk to .NET code and vice versa. I have chosen to use JSON as the message format that I will use. I did this a most languages have a number of different JSON libaries, and it is a nice light weight format.

 

 

Common Prerequisites

There are a couple of things that both sides need for the demo app to work correctly, as well as some specific thing (well only one actually, the JSON serializer library). So this section outlines the common prerequisites and the actual .NET / Scala sections of this article outline what is needed for the specific languages.

 

However like I say both side make use of RabbitMQ, and as such that needs to be installed and running. So you will need to install the following things to get RabbitMQ to work

  1. Erlang OTP : http://www.erlang.org/download.html
  2. RabbitMQ : https://www.rabbitmq.com/download.html

 

One you have installed these things it is a good idea to ensure that the following things are done

  1. Make sure RabbitMQ is setup to run as a Automatically starting service. The service name should be "RabbitMQ"
  2. Make sure that the RabbitMQ web management plugin has been installed and is working. There is an article on how to do that here https://www.rabbitmq.com/management.html

One you have done that you should be able to check the web management plugin via the following Url : http://localhost:15672/#/

Which should show you something like this:

 

CLICK FOR LARGER IMAGE

 

 

Where Is The Code

The code is available right here at my GitHub account :

https://github.com/sachabarber/ScalaToDotNetInteropUsingRabbitMq

 

 

 

The Demo App

The demo app is made up of the following

  • A single VS2013 solution for the .NET side
  • 2 different IntelliJ IDEA projects
    • Scala Publisher
    • Scala Subscriber

 

IMPORTANT NOTE

The demo apps are the bare metal parts of what you need to get .NET to talk to Scala and Vice Versa. This is not production ready at all. There is no Exception handling at all, so please treat it for what it is, a demo.

 

The following table illustrates the scenarios that I have personally tested (if the arrows seem a bit sketchy to you, you would be right, I forgot to make those bad boys one way in word (diagramming tool of champions), so had to touch them up in  paint .NET so please forgive me)

 


CLICK FOR BIGGER IMAGE

.NET publisher to .NET Subscriber


CLICK FOR BIGGER IMAGE

Scala publisher to Scala Subscriber


CLICK FOR BIGGER IMAGE

.NET publisher to Scala Subscriber


CLICK FOR BIGGER IMAGE

Scala Publisher to .NET Subscriber

 

The general idea is as follows:

  • We will use RabbitMQ to communicate cross process
  • We will use RabbitMQ headers to pass an Int value which indicates the type of the message being sent, such that the subsriber knows how to deserialize the message data being received
  • The message body itself will be JSON

 

Here are some screen shots of things working

 

Scala Publisher -> .NET Subscriber

CLICK FOR BIGGER IMAGE

 

 

.NET Publisher -> Scala Subscriber

CLICK FOR BIGGER IMAGE

 

 

The .NET Side

This section outlines how the .NET code works, and what you will need to get it to work.

 

.NET Prequisites

  • JSON .NET : This is used for JSON serialization, and can be installed via Nuget. Where the package name is Newtonsoft.Json
  • RabbitMQ client : This simply uses the officially supported .NET client via Nuget. Where the package name is RabbitMQ.Client
 

Common Message Data

For the demo app this simply data transfer object is used.

public class Person
{
    [JsonProperty(PropertyName="age")]
    public int Age { get; set; }

    [JsonProperty(PropertyName = "name")]
    public string Name { get; set; }

    public override string ToString()
    {
        return string.Format("Name : {0}, Age : {1}", Name, Age);
    }
}

The more eagle eyed amongst you will see that I am using the JsonProperty attribute, to control the name of the serialized property. We need to do this, as there is a convention in scala when using its equiavalent of a property bag kind of class (case class), that the property names are lower case.

So we need to adjust for that somewhere, I chose to do this in the .NET side as I am more familiar with JSON .Net then the Scala Play JSON library

Without this control over the serialization the Scala Play JSON library fails to deserialize .NET objects, as the casing is different.

 

The .NET Publisher

Here is the code for the .NET publisher code:

using System;
using RabbitMQ.Client;
using System.Text;
using Common;
using Newtonsoft.Json;
using RabbitMQ.Client.Framing;
using System.Collections.Generic;
using System.Threading;

namespace Publisher
{
    class Program
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                Random rand = new Random();


                while (true)
                {
                    var message = GetMessage(rand);
                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = new BasicProperties();
                    properties.Headers = new Dictionary<string, object>();
                    properties.Headers.Add("type", "1");

                    channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: properties,
                                         body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                    Thread.Sleep(1000);
                }
            }

           
            Console.ReadLine();
        }

        private static string GetMessage(Random rand)
        {
            Person p = new Person();
            p.Age = rand.Next(100);
            p.Name = string.Format("named from .NET {0}", p.Age);
            return JsonConvert.SerializeObject(p);
        }
    }
}

 

There are only a couple of points to note here:

  1. We create a new Person which is serialized to JSON
  2. We add a header value (Type = 1) to the RabbitMQ headers collection. This enabled consumers to examine the headers to see how to treat the message data
  3. We send the JSON serialized Person object using standard RabbitMQ code

 

The .NET Subscriber

Here is the code for the .NET subscriber code:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Collections.Generic;
using Common;
using Newtonsoft.Json;

namespace Subscriber
{
    class Program
    {
        public static void Main()
        {
            Dictionary<int, Type> typelookup = new Dictionary<int, Type>();
            typelookup.Add(1, typeof(Person));

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");

                Console.WriteLine(" [*] Waiting for logs.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var typeToLookupBytes = (Byte[])ea.BasicProperties.Headers["type"];
                    var typeToLookup = int.Parse(Encoding.UTF8.GetString(typeToLookupBytes));
                    var messageType = typelookup[typeToLookup];
                    var message = Encoding.UTF8.GetString(body);
                    var person = JsonConvert.DeserializeObject(message, messageType);

                    Console.WriteLine("[Recieved] message : {0}", person);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

There are only a couple of points to note here:

  1. We hook up a standard RabbitMQ consumer to listen for incoming messages
  2. We first examin the headers to see what type of message we are receiving
  3. We create a new Person by deserializinng the incoming message body (which is JSON serialized Person instance)

 

 

 

 

 

The Scala Side

This section outlines how the Scala code works, and what you will need to get it to work.

 

Scala Prequisites

 

Here is an example of the SBT file for the Scala Subscriber


name := "Subscriber"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "com.rabbitmq" % "amqp-client" % "3.5.5",
  "com.typesafe.play" % "play-json_2.11" % "2.4.3"
)

 

Common Message Data

For the demo app this simply data transfer object is used.

import play.api.libs.json.{JsPath, Json, Writes, Reads}
import play.api.libs.functional.syntax._



trait RabbitJsonMessage
case class Person(name: String, age: Int) extends  RabbitJsonMessage
{
  override def toString : String = {
    s"Name : $name, Age : $age"
  }
}

object JsonImplicits {

  implicit val personWrites = new Writes[Person] {
    def writes(person: Person) = Json.obj(
      "name" -> person.name,
      "age" -> person.age
    )
  }

  implicit val personReads : Reads[Person] = (
    (JsPath \ "name").read[String] and
      (JsPath \ "age").read[Int]
    )(Person.apply _)
}

 

This is a bit more complex than the simple JSON .Net code we saw on the .NET side. This is largely down to how the Play JSON library works with Scala, where it requires you to use the Reads/Writes traits, which you make available using Scala implicit val, as shown above.

 

  • The Writes trait (a trait is like an interface with some implementation) writes a Person case class to a JsValue
  • The Reads trait reads into a Person case class from a JsValue

 

 

 

The Scala Publisher

Here is the code for the Scala publisher code:

import com.rabbitmq.client._

import java.util.HashMap
import java.nio.charset.StandardCharsets

import play.api.libs.json._

import JsonImplicits ._



object PublisherDemo {

  def main (args:Array[String]):Unit = {
    val r = new PublisherDemo ()
    r.Send
  }
}




class PublisherDemo {
  val EXCHANGE_NAME = "logs"

  def Send () = {

    val factory = new ConnectionFactory()
    factory.setHost("localhost")
    val connection = factory.newConnection()
    val channel = connection.createChannel()

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout")

     for (i <- (0 to 100)) {

       val person = new Person("named from scala " + i.toString, i)
       val message = Json.toJson(person)
       val bytes =  message.toString.getBytes(StandardCharsets.UTF_8)
       val headers = new HashMap[String,AnyRef]()
       headers.putIfAbsent("type","1")


       val props = new AMQP.BasicProperties.Builder().headers(headers)
       channel.basicPublish(EXCHANGE_NAME,"",props.build() , bytes)
       System.out.println(" [x] Sent '" + message + "'")
       Thread.sleep(1000)
    }
    channel.close()
    connection.close()
  }
}

 

There are only a couple of points to note here:

  1. We create a new Person which is serialized to JSON
  2. We add a header value (Type = 1) to the RabbitMQ headers collection. This enabled consumers to examine the headers to see how to treat the message data
  3. We send the JSON serialized Person object using standard RabbitMQ code

 

 

The Scala Subscriber

Here is the code for the Scala subscriber code:

import java.util.HashMap
import com.rabbitmq.client._
import scala.reflect.ClassTag
import scala.runtime.RichInt
import scala.reflect.runtime.universe._
import play.api.libs.json.{JsValue, Json, Writes}
import JsonImplicits ._


object SubscriberDemo {

  def main (args:Array[String]): Unit = {
    val r = new SubscriberDemo()
    r.Receive()
  }
}


class SubscriberDemo {
  val EXCHANGE_NAME = "logs"

  def Receive() = {

      val factory = new ConnectionFactory()
      factory.setHost("localhost")
      val connection = factory.newConnection()
      val channel = connection.createChannel()

      channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
      val queueName = channel.queueDeclare().getQueue()
      channel.queueBind(queueName, EXCHANGE_NAME, "")

      val typelookup = new HashMap[Int, JsValue => RabbitJsonMessage]()
      typelookup.putIfAbsent(1,value =>
      {
          val person = Json.fromJson[Person](value).get
          person.asInstanceOf[RabbitJsonMessage]
      })

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
      val consumer = new DefaultConsumer(channel) {


      override def handleDelivery(consumerTag: String, envelope: Envelope, 
		properties: AMQP.BasicProperties, body: scala.Array[scala.Byte] ) =
      {
          val typeToLookup = properties.getHeaders().get("type").toString().toInt
          val jsonConverter = typelookup.get(typeToLookup)
          val messageBody = new String(body, "UTF-8")
          val jsonObject = Json.parse(messageBody)
          val person = jsonConverter(jsonObject).asInstanceOf[Person]
          System.out.println(" [x] Received '" + person + "'");
      }

    }
    channel.basicConsume(queueName, true, consumer)

  }
}

There are only a couple of points to note here:

  1. We hook up a standard RabbitMQ consumer to listen for incoming messages
  2. We first examin the headers to see what type of message we are receiving
  3. We create a new Person by deserializinng the incoming message body (which is JSON serialized Person instance)

 

 

 

 

That's All

That is all I wanted to say this time, but if you like the article, and think it is useful, votes, comments are always welcome

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here