Introduction
This article presents a simple stock explorer application which demonstrates how a publish/subscribe mechanism can be used to write classic request/response style applications. More specifically, how to build a simple financial query service that uses Yahoo Finance along with Morningstar financial information to present a simple querying API. We are going to use emitter.io service to handle our publish/subscribe communication. Emitter source-code can can be seen on GitHub.
Background
Now, emitter is a distributed, publish-subscribe, MQTT broker. In this article we assume some basic knowledge of MQTT and we won’t go in detail through the specifications of the protocol and how to use it, however, here’s a couple of important points:
- In MQTT, clients can subscribe to topics (channels), which are represented by a hierarchical strings (e.g.:
sensor/1/temperature/
). - Clients can publish arbitrary binary data to those channels .
- MQTT header for packets is only 2 bytes and a varierty of client libraries exists out there.
We are going to implement a simple request-response topology in this article on top of a publish/subscribe protocol. The strategy is quite simple:
Server
- Subscribes to
quote-request
channel. - Every time a message is received, such as
{ symbol: "MSFT", reply: "1234"}
, it processes the message and sends a response to the quote-response/xxx
channel where xxx
is the reply
value from the request, for example ``quote-response/1234`.
Client
- Subscribes to
quote-response/xxx
channel where xxx
is the identifier of the session - some unique value that only the client knows. - Every time the user enters a ticker, we send a message to
quote-request
channel with the ticker and identifier. - Every time it receives the response, process it and bind to the view.
Server
We are going to start with first building the server which can receive requests on quote-request
channel and reply on quote-response/...
channel. We start by importing emitter along with other business logic that handles.
import (
"./finance/provider"
emitter "github.com/emitter-io/go"
)
In our main()
function we start by initializing a new Provider
which will handle the financial querying of the data. Since I’d like to keep this article short, we’re not going to explore in depth how this is implemented, but feel free to explore the source code for this on GitHub.
p := provider.NewProvider()
o := emitter.NewClientOptions()
In the options we set the message handler, which is a function which will get called every time the server receives a message. Every time we receive a request, we parse it using json.Unmarshal()
and call GetQuotes()
method on the provider, which will return a response containing the stock and financial information for the ticker symbol along with the dividend history. We then simply serialize the result and publish it to the quote-response/
channel with a subchannel specified in the request, making sure that only a the requester receives this response.
o.SetOnMessageHandler(func(c emitter.Emitter, msg emitter.Message) {
fmt.Printf("Received message: %s %v\n", msg.Payload(), msg.Topic())
var request map[string]string
if err := json.Unmarshal(msg.Payload(), &request); err != nil {
fmt.Println("Error: Unable to parse the request")
return
}
quotes, err := p.GetQuotes(request["symbol"])
if err != nil {
fmt.Println("Error: Unable to process the request")
return
}
response, _ := json.Marshal(quotes[0])
c.Publish(key, "quote-response/"+request["reply"], response)
})
Finally, we need to simply start the server by creating a new emitter client using NewClient()
function, connect to it and subscribe to the quote-request
channel so that we can receive the requests.
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
panic("Error on Client.Connect(): " + sToken.Error().Error())
}
c.Subscribe("FKLs16Vo7W4RjYCvU86Nk0GvHNi5AK8t", "quote-request")
Client
The client we’re going to build uses VueJs data binding framework to bind the results we receive from our server to the HTML DOM. Of course, you could use any other data binding framework such as React, Angular or Durandal to do so.
Our model, as shown below consists of a symbol
property which is bound to the input box, and the result
will be bound using simple handlebar tags. For more information, check out the index.html
page which contains all the layouting and data binding. The model itself is rather simple, as you can see.
var vue = new Vue({
el: '#app',
data: {
symbol: 'AAPL',
result: new Object()
},
methods: {
query: function () {
},
},
});
Client - Networking
We continue by implementing the networking part using emitter. First order of business is to connect to the emitter broker. We simply call emitter.connect()
and it will connect us to api.emitter.io:8080
endpoint, which is a free sandbox.
var emitter = emitter.connect({
secure: true
});
Once we’re connected to the server, we need to subscribe to the quote-response/
channel with the suffix which represents a unique id of the current browser. The idea here is that we subscribe to channel unique for this user session. That way only one user gets notified with the response.
emitter.on('connect', function(){
console.log('emitter: connected');
emitter.subscribe({
key: resKey,
channel: "quote-response/" + getPersistentVisitorId()
});
})
We then add a query method which we will call every time the search button is pressed. In this method, we will simply publish a message (e.g.: { symbol: "AAPL", reply: "12345" }
) to quote-request
channel, where we provide a reply
parameter so our server knows where to reply. This will simply be appended to the channel where our server will publish the response, e.g. quote-response/12345
in the example above.
query: function () {
console.log('emitter: publishing ');
emitter.publish({
key: reqKey,
channel: "quote-request",
message: JSON.stringify({
symbol: this.$data.symbol,
reply: getPersistentVisitorId()
})
});
Finally, every time we will receive a message, we will simply convert it from JSON format to an objet using msg.asObject()
method, do some work on the data and bind the result to our view.
emitter.on('message', function(msg){
var data = msg.asObject();
console.log('emitter: received ', msg.asObject());
vue.$data.result = data;
});
Client - Charting
We are going to show a couple of graphs in our stock explorer results page:
- A graph to show the stock value with moving averages for 50 and 200 days.
- A graph to show the dividend history.
For the first graph, we are going to simply use Yahoo Finance graph API. This can be used by simply providing the ticker in the chart.finance.yahoo
endpoint which returns an image, for example http://chart.finance.yahoo.com/z?s=TSLA&t=6m&q=l&l=on&z=l&p=m50,m200
will show us the graph for Tesla Motors Inc., with moving average for 50 and 200 days (green and red line). As shown below, we can just replace the ticker with the data from our model by using {{result.Symbol}}
.
<img class="col-sm-12 yahoo-chart" src="http://chart.finance.yahoo.com/z?s={{result.Symbol}}&t=6m&q=l&l=on&z=l&p=m50,m200" />
The second graph we want to present is the dividend history. We are going to use chartist javascript library to do the visualisation. The function below draws the dividend chart when we receive the data as a response from emitter broker. We simply iterate through the dividends and push labels (month/day values) in one array and series (corresponding values) to series
array. Once this is done, we call Chartist.Line()
function to trigger the rendering of the chart.
function drawDividendChart(data){
labels = [];
series = [];
data.DividendHistory.forEach(function(d){
labels.push(formatDate(d.Date));
series.push(d.Value);
});
new Chartist.Line('#dividends-chart', {
labels: labels,
series: [series]
}, {
fullWidth: true,
axisX: {
showGrid: false,
labelInterpolationFnc: function(value, index) {
return index % 2 === 0 ? value : null;
}
}
});
}
Why Publish/Subscribe?
You might wonder why you’d want to use a publish/subscribe system for simple request-response communication. It is a bit like a foundation of a house: if you want to build a house of just one floor high, you could decide to have a foundation that can only support one floor. That will work just fine. However, if you ever want to extend your house with an extra floor, you are in trouble - there are no easy ways to expand. You can imagine publish/subscribe as the foundation of your application. Request-response is your first floor. Perhaps request-response is the only thing you need right now, but at a later stage you might want to add other ways of communication. In the stock explorer application presented in this article, suppose you want to send push notifications to users based on specific events, such as stock levels going through certain thresholds. Ideally, this will also work if a receiver is offline, he should be able to receive the message upon connecting. This can easily be done in the following way with emitter in 2 simple steps.
- Make sure the broker publishes messages over a channel with for example the ticker symbol as (part of) the channel name in case of certain stock level events. You could create sub-chanels for each event type.
emitter.publish({
key: "<channel key>",
channel: "stocks/msft",
ttl: 86400,
message: "microsoft stock up by 1.0%"
});
- Each user can subscribe to channels to receive push notifications related to the stock ticker of that channel. A user could subscribe to complete channels or to subchannels.
emitter.on('connect', function(){
emitter.subscribe({
key: "<channel key>",
channel: "stocks/msft"
});
});
By using the pubsub system you now in fact did the following: You implemented one-to-many communication, you decoupled sender and receiver, you used message storage to deliver messages with delay if a receiver was offline, you created message filtering based on (sub)channels. All of that with just a couple of lines of code. Isn’t it awesome?
History
- 12/02/2016 - Initial Version of the article