Introduction
In this article, I'm going to show you how to create a collaborative todo list using Emitter for communication between clients.
Emitter is a distributed publish-subscribe platform that is open-source, secure, and extremely easy to use. If you first want to be introduced to the basics of Emitter, you can check this Getting Started video, this other article on CodeProject, and of course, the official website: emitter.io.
Technical Choices
Vue.js is easy to set up, easy to use, and its footprint is smaller than the one of other frameworks like Angular.js. The interface of this todo list is originally a sample hosted on the official Vue.js website. It's nice, but it only stores your todo list in the local storage of your browser. Your list cannot be accessed remotely. I want to be able to share that todo list, and I want people to be able to read and amend it concurrently. For that purpose, instead of being stored in the browser's local storage, the list is going to be stored in a database on a server. For the sake of simplicity, I'll write the server in JavaScript using Node.js, and will use SQLite for a DBMS. For those unfamiliar with SQLite, this DBMS simply stores all the data in a text file. It's efficient enough to be used in many professionnal applications, and its footprint makes it a great candidate for embedded systems. Click here for a detailed explanation about when SQLite is a good choice.
The Server
The Initialization
You can install SQLite for Node with this one npm
command:
npm install sqlite3 --save
And that's it! It's ready to use. The following two lines of code suffice to open a database:
var sqlite3 = require("sqlite3");
var db = new sqlite3.Database("todos.db", sqlite3.OPEN_READWRITE | sqlite3.OPEN_CREATE);
If the database didn't exist, a new empty todos.db file was just created. At this point, we should check whether the database is empty, and create our schema if this is the case:
db.get("SELECT name FROM sqlite_master WHERE type='table'
AND name='todos'", function(err, data)
{
if (data)
{
startListening();
}
else
{
db.serialize(function()
{
db.run("CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT,
completed BOOLEAN DEFAULT 0, title TEXT, version INTEGER DEFAULT 0)");
db.run("INSERT INTO todos (title) VALUES (\"Buy ketchup\")");
db.run("INSERT INTO todos (title) VALUES (\"Pay internet bill\")");
db.run("INSERT INTO todos (title) VALUES (\"Anna's birthday present\")",
function()
{
startListening();
});
});
}
});
The purpose of the startListening
function is to instantiate the Emitter
object, initiate a connection, and finally to subscribe to the todo channel whenever the connection is ready:
var emitterKey = "9SN1Xg1DjvmeiSdpnS0WdKkrxlz0koBH";
var channel = "todo";
var emitter = undefined;
function startListening()
{
emitter = require('emitter-io').connect(null, function()
{
console.log('emitter: connected');
emitter.subscribe({
key: emitterKey,
channel: channel + "/cmd",
});
emitter.on('message', function(msg)
{
console.log('emitter: received ' + msg.asString());
msg = msg.asObject();
handle[msg.cmd](msg);
});
});
}
Messages sent to the server should always contain a cmd
attribute - a verb describing the purpose of the request - along with the data required to execute the request. Emitter's message event handler only parses the message to transform it into an object (msg = msg.asObject()
) and calls the right handler based on the verb in the request (handle[msg.cmd](msg)
), given that handle
is:
var handle = {
"getall": handleGetAll,
"add": handleAdd,
"delete": handleDelete,
"removeCompleted": handleRemoveCompleted,
"complete": handleComplete,
"edit": handleEdit
};
Now let's review the code of the most interesting handlers.
Handling the Requests with SQLite
In the following pieces of codes, I'll use a helper function to publish messages through Emitter:
function publish(recipient, msg)
{
emitter.publish({
key: emitterKey,
channel: channel + "/" + recipient,
message: JSON.stringify(msg)
});
}
There is not much to explain here. It simply calls the publish
function, passing the emitterKey
, building the name of the channel, and stringifying the message.
The getAll Handler
When a client displays the todo list for the first time, the first thing it must do is send a getAll
request to retrieve the full list:
function handleGetAll(msg)
{
db.all("SELECT * FROM todos", function(err, rows){
if (err)
publish(msg.sender, {cmd: "err", err: err, request: msg});
else
publish(msg.sender, {cmd: "getall", todos: rows});
});
}
This is the simplest handler. It calls all()
on the db to fetch a set of rows. If, for whatever reason, the query yields an error, this error is sent back to the client together with the original request. Otherwise, it publishes the result of the query.
The Add Handler
function handleAdd(msg)
{
db.run("INSERT INTO todos (title) VALUES (?)", [msg.title], function(err)
{
if (err)
publish(msg.sender, {cmd: "err", err: err, request: msg});
else
publish("broadcast", {cmd: "add", todo:
{id: this.lastID, completed: false, title: msg.title }});
});
}
The same pattern is applied here. But note that:
- Parameters are injected into the query using a placeholder
?
, and SQLite takes care of the potentially malicious code passed in the parameter. - The result of the query is not just sent to the client that made the request, but broadcasted by publishing it in the "broadcast" sub-channel, to which all clients must be subscribed.
- To have the full record to broadcast, only the id is missing. It is retrieved, thanks to the
lastID
variable.
The Edit Handler
This is the last handler that I'll present here. The remaining ones do not add much to the topic.
function handleEdit(msg)
{
db.get("SELECT version FROM todos WHERE id = ?", [msg.id], function (err, row)
{
if (err)
{
console.log(err);
publish(msg.sender, {cmd: "err", err: err, request: msg});
return;
}
var newVersion = row.version + 1;
db.run("UPDATE todos SET title = ?, version = ? WHERE id = ? AND
version = ?", [msg.title, newVersion, msg.id, row.version], function(err){
if (err)
{
console.log(err);
publish(msg.sender, {cmd: "err", err: err, request: msg});
return;
}
if (this.changes)
publish("broadcast", {cmd: "edit", todo: {id: msg.id,
title: msg.title, version: newVersion}});
});
});
}
The most important thing to note here is that each record has a version number that must be incremented with each update. So, first, we need to retrieve the record, then increment its version number, and finally request an update on the row. But we must execute this update filtering not only on the id, but also on the version version number. Because, on one the hand, all those requests are executed asynchronously, and on the other hand, this read-write operation is not atomic. There are no guarantees that another request can't update the row between our select
query and our update
query. If such a thing happens, there is no need to update the row anymore as the request we are handling here is already outdated. We ensure that we do not update a row with outdated data by selecting the row based on the version number, which might yields no results, and therefore the processing stops there and no update message is broadcasted to the client.
The Client
The client is a little bit more complex, as it must handle several potential cases of conflicting updates. I'm going to focus on the conflicts resolutions and leave matters related to Vue.js aside.
Much like on the server side, the code below will use the following publish helper function:
function publish(msg)
{
emitter.publish({
key: emitterKey,
channel: channel + "/cmd",
message: JSON.stringify(msg)
});
}
Unlike on the server side, all messages here are aimed at a single recipient, the server. The channel to which the server is listening is cmd
. All clients are supposed to send their commands through this channel.
Remember that each request has a cmd
attribute. Well, responses to commands have the same cmd
attribute, and are handled following the same principle than on the server side:
var handle = {
"getall": handleGetAll,
"add": handleAdd,
"delete": handleDelete,
"complete": handleComplete,
"edit": handleEdit,
"err": handleError
};
Now, let's give a look at the main handler:
emitter.on('message', function(msg){
console.log('emitter: received ' + msg.asString() );
msg = msg.asObject();
if (app.$data.todos === undefined && msg.cmd != "getall")
app.$data.cmdToApply.push(msg);
else
{
if (!handle[msg.cmd](msg))
app.$data.cmdToApply.push(msg);
}
});
Again, the first thing a client must do is to send a getall
request in order to retrieve the full todo list. But there are no guarantees that the response to this request is going to be the first message the client receives. That's why, if the client is waiting for the response to a getall
request (that is, the todo list is still undefined), and the update message it receives is not a getall
response, then this message should be stored for it to be applied later.
Otherwise, this update should be applied immediately. Every handler should return a boolean indicating whether a message was successfuly applied. If not, this message should, once again, be stored to be applied later.
function handleGetAll(msg)
{
app.$data.todos = msg.todos;
delayedApply();
return true;
}
Once the client finally receives an answer to the getall
request, it must initialize its todo
list, then apply all the updates it received in the meantime. The delayedApply()
function simply iterates through the array of updates and tries to apply them, one after the other, to the current state of our todo list:
function delayedApply()
{
var remainingCommandsToApply = [];
for (var i = 0; i < app.$data.cmdToApply.length; ++i)
{
var msg = app.$data.cmdToApply[i];
var treated = handle[msg.cmd](msg);
if (!treated)
remainingCommandsToApply.push(msg);
}
app.$data.cmdToApply = remainingCommandsToApply;
}
Of course, an update is only removed from the cmdToApply
array if it was succesfully applied.
The next handler we are going to examine is the add
handler:
function handleAdd(msg)
{
console.log("add");
if (isBuried(msg.todo.id)) return true;
for (var i = 0; i < app.$data.todos.length; ++i)
{
var todo = app.$data.todos[i];
if (todo.id == msg.todo.id) return true;
}
app.$data.todos.push(msg.todo);
delayedApply();
return true;
}
The first thing this function does is to check whether this todo was already "buried". That is, whether this todo was deleted by an update received earlier (see Tombstone (data store)). Indeed, another client could have received the add update for this todo, then deleted this todo. Then our client here might have received the delete update before it has even received the add update! In theory, that's a possibility we have to take into account...
function isBuried(id)
{
return app.$data.cemetery.indexOf(id) == -1 ? false : true;
}
The isBuried
function simply checks whether the todo's id is resting in the cemetery.
Then, the handler checks whether, for whatever reason, this todo already was inserted. Finally, the handler pushes the todo record and calls the delayedApply()
function, which will make sure all updates related to this newly added todo are applied.
Note that this handler always returns true
. The todo could have just been succesfully added, or might have been buried or added earlier. In any case, the client should not worry about this update message anymore.
Now let's give a look at the edit
handler:
function handleEdit(msg)
{
if (isBuried(msg.todo.id)) return true;
for (var i = 0; i < app.$data.todos.length; ++i)
{
var todo = app.$data.todos[i];
if (todo.id == msg.todo.id)
{
if (todo.version >= msg.todo.version) return true;
todo.title = msg.todo.title;
todo.version = msg.todo.version;
return true;
}
}
return false;
}
Once again, the first thing is to check whether the todo was already buried.
Then the handler iterates through the list of todos in search of the todo that should be updated. When this todo is found, it is only updated if its version number is lower than the version number passed in the request.
If the todo wasn't found in the current list, and wasn't burried either, then most likely the update message requesting the insertion of this todo wasn't received yet. The handler therefore returns false, which will cause the message to be pushed on the list of updates to apply later.
The delete
and complete
handlers follow the same principles, checking the cemetery and only applying the update when the version of the client is lower than the version proposed in the update.
Conclusion
That's it for the few tricks that help deal with the potential conflicts arising when trying to write a collaborative application. There is really not much to write about Emitter itself, precisely because it is so easy to use.
History
- 4th February, 2017: Initial version