Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Crossbar.io a quick look at it

0.00/5 (No votes)
24 Apr 2017 1  
A look at the crossbar.io (Autobahn messaging platform)

Introduction

A while ago someone posted another SignalR article on www.codeproject.com, and I stated hey you should take a look at this stuff : crossbar.io, and not liking to me someone that is not willing to die by the sword (afting living by it of course) I decided to put my money where my mouth was/is, and write a small article on crossbar.io.

 

So what its crossbar.io? Well quite simply it is a message broker that has many language bindings, that should all be able to communicate together seamlessly. 

Here is what the people being crossbar.io have to say about their own product

Crossbar.io is an open source networking platform for distributed and microservice applications. It implements the open Web Application Messaging Protocol (WAMP), is feature rich, scalable, robust and secure. Let Crossbar.io take care of the hard parts of messaging so you can focus on your app's features.

 

 

So tell me more about Crossbar.IO

crossbar.io claim to have these language bindings:

  • JavaScript in the browser *
  • JavaScript in Node *
  • Python
  • PHP
  • Java
  • C#
  • Erlang

 

So on the surface of it, this looks like quite a cool library allowing many different disparate applications talk to each other.

It is worth pointing out that these language bindings without a * are written by the community, they are NOT maintained by the crossbar.io developers, as such you may find differences between what works in the main actual crossbar.io maintained language binding.

Unfornately this is life, its exactly the same when you use other messaging frameworks with multi-language bindings like RabbitMQ and Kafka (which we will be looking at more later).

 

 

Where is the code?

For the little demo app that this article uses all the demo code is at my GitHub account : https://github.com/sachabarber/CrossbarioDemo

 

Installation

This section will guide you through the installation process

 

Installing Python

  1. Download python, install it to default not to "Program Files", make sure "Add to path" is YES
  2. Open a cmd line where you installed python
  3. using pip from python command line: pip install pypiwin32
  4. using pip from python command line: pip install crossbar


Verifying Crossbar.io installation

1. Validate install command line : which crossbar 

CLICK FOR LARGER IMAGE

 

2. Validate install command line : crossbar version

CLICK FOR LARGER IMAGE

 

 

 

The Demo

This section will outline some core crossbar.io idea before we dive into the demo code

Core Concepts

crossbar node

crossbar.io has this concept of  "node". A "node" is basically a running instance of the crossbar.exe process which is running an active configuration. Typically this would be once per machine/VM.

.crossbar folders

In order to configure the running crossbar node, there is a special folder where it will look, which is called ".crossbar". In this folder, there are a number of files, the MOST important of which is "config.json", which is what is used to actual configure the crossbar.exe process when it starts. This folder is magical/special and will be examined when you use the crossbar start command line (which we don't actually want to do, but more on this in just a moment).

When we issue a crossbar start command line, crossbar.exe will pick up the config file it finds at within the folder specified by the .crossbar name.

So what's it one of these "config.json" files.

Here is one that comes with the  .NET demos : https://github.com/crossbario/crossbar-examples, namely the https://github.com/crossbario/crossbar-examples/tree/master/hello/csharp demo (the crossbar.io  web site says you should be able to use the command line to scaffold a new project for your language of choice, but you xan't they found it too much work to manatin the templates so you need to grab them from the examples folder).

{
    "version": 2,
    "controller": {},
    "workers": [
        {
            "type": "router",
            "options": {
                "pythonpath": [
                    ".."
                ]
            },
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": false,
                                        "publisher": false
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ]
                }
            ],
            "transports": [
                {
                    "type": "web",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8080
                    },
                    "paths": {
                        "/": {
                            "type": "static",
                            "directory": "../src/Web"
                        },
                        "ws": {
                            "type": "websocket"
                        }
                    }
                }
            ]
        },
        {
            "type": "guest",
            "executable": "Hello.exe",
            "arguments": [
                "ws://127.0.0.1:8080/ws",
                "realm1"
            ],
            "options": {
                "workdir": "../src/DotNet/Hello/bin/Debug/"
            }
        }
    ]
}

 

There are actual 2 workers in this one config file

  • A web site
  • A .NET application (Hello.exe)

 

So lets think about that for a minute, we have this config file, and we are supposed to run crossbar.exe with this configuration file, and it starts Hello.exe and some web site. mmmm, How do I debug Hello.exe if Crossbar.exe is running it.

Seems to me that this is the opposite of what we want, how do we debug. There are a few posts on this

I decided to take an alternative route to this, I thought I don't mind using the web dev tools (F12 in Chrome) to debug JavaScripts, but for .NET I want to use Visual Studio, so I get my .NET code to start Crossbar.io as an extra process, where I config the .NET worker in code, and pass the new Crossbar.exe process a modified version of the file above, that has the .NET Hello.exe config taken out of it.

I have to say if I had of been using RabbitMQ OR Kafka this would have been childs play. I prefer embedded APIs rather than having to submit my code to something else to run. Sure you MUST be able to configure stuff yourself, but why not start out that way.

 

RPC

crossbar.io allows workers to make Remote Procedure Calls to other workers in different languages

 

Pub/Sub

crossbar.io allows workers to publish and receive to/from other workers in different languages

 

The Demo Code

The demo code will show a single .NET worker and single JavaScript worker that will pub/sub and RPC to each other

 

The .NET code

Here is the entire .NET code for a publisher/subscriber, and RPC callable .NET program

using System;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using WampSharp.Core.Listener;
using WampSharp.V2;
using WampSharp.V2.Client;
using WampSharp.V2.Core.Contracts;
using WampSharp.V2.Realm;
using WampSharp.V2.Rpc;
using System.Diagnostics;

namespace Hello
{
    public class Program
    {
        static void Main(string[] args)
        {

#if DEBUG
            //use this for debugging
            Task.Factory.StartNew(() =>
            {
                Process process = new Process();
                process.StartInfo.FileName = @"c:\Users\sacha\AppData\Local\Programs\Python\Python36-32\Scripts\crossbar.exe";
                process.StartInfo.Arguments = @"start --cbdir C:\Users\sacha\Desktop\CrossbarIOExample\CrossBarDotNetExample\.crossbar";
                process.StartInfo.WindowStyle = ProcessWindowStyle.Maximized;
                process.Start();
                // Waits here for the process to exit, but since this is 
                // dedicated thread main thread is not blocked
                process.WaitForExit();

            }, TaskCreationOptions.LongRunning);

            //give CrossBar.io process time to start
            System.Threading.Thread.Sleep(1000 * 20);
#endif


            Console.WriteLine("WampSharp Hello demo starting ...");

            string wsuri = "ws://127.0.0.1:8080/ws";
            string realm = "realm1";
            if (args.Length > 0) {
               wsuri = args[0];
               if (args.Length > 1) {
                  realm = args[1];
               }
            }
            
            Task runTask = Run(wsuri, realm);

            Console.ReadLine();
        }

        private async static Task Run(string wsuri, string realm)
        {
            Console.WriteLine("Connecting to {0}, realm {1}", wsuri, realm);

            DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

            IWampChannel channel =
                factory.CreateJsonChannel(wsuri, realm);

            IWampClientConnectionMonitor monitor = channel.RealmProxy.Monitor;
            
            monitor.ConnectionBroken += OnClose;
            monitor.ConnectionError += OnError;

            await channel.Open().ConfigureAwait(false);

            IWampRealmServiceProvider services = channel.RealmProxy.Services;

            // SUBSCRIBE to a topic and receive events
            ISubject<string> helloSubject = 
                services.GetSubject<string>("com.example.onhello");

            IDisposable subscription =
                helloSubject.Subscribe
                    (x => Console.WriteLine("event for 'onhello' received: {0}", x));

            Console.WriteLine("subscribed to topic 'onhello'");


            // REGISTER a procedure for remote calling
            Add2Service callee = new Add2Service();

            await services.RegisterCallee(callee)
                .ConfigureAwait(false);
            
            Console.WriteLine("procedure add2() registered");


            // PUBLISH and CALL every second... forever
            ISubject<int> onCounterSubject =
                services.GetSubject<int>("com.example.oncounter");

            ISubject<int> onDotNetCounterSubject =
              services.GetSubject<int>("com.example.ondotnetcounter");


            IMul2Service proxy =
                services.GetCalleeProxy<IMul2Service>();

            int counter = 0;

            while (true)
            {
                // PUBLISH an event
                onCounterSubject.OnNext(counter);
                Console.WriteLine("published to 'oncounter' with counter {0}", counter);

                onDotNetCounterSubject.OnNext(counter);
                Console.WriteLine("published to 'ondotnetcounter' with counter {0}", counter);
                counter++;


                // CALL a remote procedure
                try
                {
                    int result = await proxy.Multiply(counter, 3)
                        .ConfigureAwait(false);

                    Console.WriteLine("mul2() called with result: {0}", result);
                }
                catch (WampException ex)
                {
                    if (ex.ErrorUri != "wamp.error.no_such_procedure")
                    {
                        Console.WriteLine("call of mul2() failed: " + ex);
                    }
                }


                await Task.Delay(TimeSpan.FromSeconds(1))
                    .ConfigureAwait(false);
            }
        }

        #region Callee

        public interface IAdd2Service
        {
            [WampProcedure("com.example.add2")]
            int Add(int x, int y);
        }

        public class Add2Service : IAdd2Service
        {
            public int Add(int x, int y)
            {
                Console.WriteLine("add2() called with {0} and {1}", x, y);
                return x + y;
            }
        }

        #endregion

        #region Caller

        public interface IMul2Service
        {
            [WampProcedure("com.example.mul2")]
            Task<int> Multiply(int x, int y);             
        }

        #endregion

        private static void OnClose(object sender, WampSessionCloseEventArgs e)
        {
            Console.WriteLine("connection closed. reason: " + e.Reason);
        }

        private static void OnError(object sender, WampConnectionErrorEventArgs e)
        {
            Console.WriteLine("connection error. error: " + e.Exception);
        }
    }
}

This uses the ONLY .NET binding WampSharp Nuget package. This is developed by a community member, so updates may be patchy. Here be dragons (well possibly anyway)

 

The JavaScript Code

Here is the entire JavaScript code for a publisher/subscriber, and RPC callable JavaScript program

<!DOCTYPE html>
<html>
   <body>
      <h1>Hello WAMP</h1>
      <p>Open JavaScript console to watch output.</p>
      <script>AUTOBAHN_DEBUG = false;</script>
       <script src="js/autobahn.min.js"></script>

      <script>
         // the URL of the WAMP Router (Crossbar.io)
         //
         var wsuri;
         if (document.location.origin == "file://") {
            wsuri = "ws://127.0.0.1:8080/ws";

         } else {
            wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" +
                        document.location.host + "/ws";
         }


         // the WAMP connection to the Router
         //
         var connection = new autobahn.Connection({
            url: wsuri,
            realm: "realm1"
         });


         // timers
         //
         var t1, t2;


         // fired when connection is established and session attached
         //
         connection.onopen = function (session, details) {

            console.log("Connected");

            // SUBSCRIBE to a topic and receive events
            //
            function on_counter (args) {
               var counter = args[0];
               console.log("on_counter() event received with counter " + counter);
            }
            session.subscribe('com.example.oncounter', on_counter).then(
               function (sub) {
                  console.log('subscribed to topic');
               },
               function (err) {
                  console.log('failed to subscribe to topic', err);
               }
            );
			
			
	//SUBSCRIBE to a topic and receive events
            
            function on_dotnetcounter (args) {
               var counter = args[0];
               console.log("DOTNET : on_dotnetcounter() event received with counter " + counter);
            }
            session.subscribe('com.example.ondotnetcounter', on_dotnetcounter).then(
               function (sub) {
                  console.log('subscribed to topic');
               },
               function (err) {
                  console.log('failed to subscribe to topic', err);
               }
            );


            // PUBLISH an event every second
            //
            t1 = setInterval(function () {

               session.publish('com.example.onhello', ['Hello from JavaScript (browser)']);
               console.log("published to topic 'com.example.onhello'");
            }, 1000);


            // REGISTER a procedure for remote calling
            //
            function mul2 (args) {
               var x = args[0];
               var y = args[1];
               console.log("mul2() called with " + x + " and " + y);
               return x * y;
            }
            session.register('com.example.mul2', mul2).then(
               function (reg) {
                  console.log('procedure registered');
               },
               function (err) {
                  console.log('failed to register procedure', err);
               }
            );


            // CALL a remote procedure every second
            //
            var x = 0;

            t2 = setInterval(function () {

               session.call('com.example.add2', [x, 18]).then(
                  function (res) {
                     console.log("add2() result:", res);
                  },
                  function (err) {
                     console.log("add2() error:", err);
                  }
               );

               x += 3;
            }, 1000);
         };


         // fired when connection was lost (or could not be established)
         //
         connection.onclose = function (reason, details) {
            console.log("Connection lost: " + reason);
            if (t1) {
               clearInterval(t1);
               t1 = null;
            }
            if (t2) {
               clearInterval(t2);
               t2 = null;
            }
         }


         // now actually open the connection
         //
         connection.open();

      </script>
   </body>
</html>

As JavaScript IS AN OFFICIALLY supported binding, this should be kept up to date by the crossbar.io  folks.

 

Runtime Screenshots

And this is what we see when we run the solution in Visual Studio

CLICK FOR LARGER IMAGE

 

How do I debug and still run crossbar?

  • For the .NET code you can just use my trick where I start an extra crossbar.exe process and tell it what modified config file to use
  • For the web app just use the dev tools of your browser.

 

 

Comparison with others

It seems only fair that I do a comparison of it next to a few other messaging frameworks that I like/have used. Its worth noting that these all need something extra installed before you even get into any comparison

  • Crossbar needs python
  • Rabbit need Erlang
  • Kafka needs a JDK

 

 Multi LanguagePersisted messagedClusteredConfigurationDebugging ExperiencePub/SubRPC
Crossbar.ioYesNo (1)NoVia some weird .crossbar folder. May be possible to do same in code but probably a right nightmareBit naff to be honest. As crossbar wants to use the .crossbar folder config to start your "crossbar node". Which means it is running the app not your IDE. So you are forced to start crossbar.exe from your app, and pass it a config command line arg, and wait for it to be running.

Mmm
Yes, with easeYes, with ease
RabbitMQYesYesYesVia standard code/configJust run browser/IDE and put in breakpointsYes, with ease (queues)Yes, with correlationId
KafkaYesYesYesVia standard code/configJust run browser/IDE and put in breakpointsYes, with ease (topics)Yes, with correlationId + extra topic, not as easy as others

 

1 One way to deal with this is slap a database in the way so all producers write to the database first, then you just use event sourcing to supply the crossbar.io node with events, which it will broadcast out.

So as you can see there are far better solutions out there, but it really does depend on what your needs are.

 

Conclusion

This is a tricky part to write, as on one hand I like the fact that this is fairly easy to use, but on the other hand there are just too many factors that I don't like to make this my defacto goto messaging solution.

I like how different things can talk to each other fairly easily (all be it each language bindding offers a VERY different API). I don't like how debugging is hard, and I have to use this .crossbar folder to configure things.

I am also not a fan of the messaging not being durable. Sure you can get round it as I've stated but there are better solutions out there.

So for me its ok if all you really want is fire and forget, non persisted messaging, for anything else I would use RabbitMQ or Kafka

 

 

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here