Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / algorithm

Graph Intersection - A Map Reduce Approach with Akka

5.00/5 (4 votes)
14 Dec 2016CPOL8 min read 14.8K   57  
Graph intersection using map reduce and Akka

Introduction

The article introduces the Akka framework [1] applied to the graph intersection problem. We'll speak about actors, clustering and a map reduce approach to the problem.

Background

The problem 

Given a set of graphs I = {G1,...,Gn} the graph intersection is the set O of common arches (ai,ak) in I. We want to find such set O.

The MapReduce approach

ob

The idea behind the MapReduce model is to split the computation over an input I in two phases: 

  1. Map phase: is a function performed in each concurrent unit of computation. Each unit applies the function to a subset of I
  2. Reduce phase: it post process the outputs of Map phases and returns the final result

This approach is quite common in massive data structure computations, stream processing and big data queries. Solutions like Kafka and Apache Spark implement MapReduce under the hood.

We want to solve the problem of graph intersection using this approach. The steps are:

  1. Given  the input  I = {G1,...,Gn} we say that a generic Gi has a set of arches A= {(al,a2),...,(am,am+1)}
  2. We prepare the Map inputs computing for each Gi a partition  A= {Ai', Ai"} based on the parity of the first node value of the arch
  3. The Map phase: each of the two Map functions computes concurrently the intersection between the partition assigned. So the input of the first Map will be {A1',...,An'} and the input of the second Map will be  {A1",...,An"}
  4. The Reduce phase: it gathers the partial intersections as the union set of Map results and returns O

Example:

Image 1

Akka framework and libraries used

Akka is a framework to write reactive systems [2] so it is well suited for production-ready distributed computation. It's developed in both Java and Scala and in this article we'll cover only a sub set of the functionalities provided:

  • Actors: is the bulding block of an asynchronous, non-blocking and highly performant message-driven programming model. They are location transparent.

  • Akka cluster: it provides a peer-to-peer based cluster membership service with features to achieve fault-tolerance.

  • Spray framework: it allows to build REST-HTTP based functionalities over Akka framework.

Technical notes

The cluster is composed by a master machine M which exposes the REST api's and runs the Akka seed nodes as well as the reducer and 4 slaves machines Si which runs the map function. Of course, the following configuration is not intended to be the best configuration in every scenario, it should be considered as an easy, costless yet generic set-up in order to develop and run the code in a distributed scenario. 

  • machineM
    • SO: Ubuntu 15.10, kernel 4.2.0-42-generic
    • JDK: 4.2.0-42-generic
    • SBT/Scala: 0.13.12/2.10.6
  • slaves Si
    • SO: Ubuntu 16.04 LTS,  kernel 4.4.0-21-generic
    • JDK: 4.2.0-42-generic
    • SBT/Scala: 0.13.12/2.10.6
  • Network notes
    • M communicates with each Si over TCP/IP

Code overview

Highlights

application.conf 

  • remote.netty.tcp.hostname (the machine you want to run the actor system on)
  • cluster.seed-nodes (the initial contact points of the cluster for the other nodes)

worker.conf

  • remote.netty.tcp.hostname (the worker machine)
  • contact-points (the initial contact points of the cluster)

Main.scala

  • Here the entry point of the application. Through the args we'll start the WebServer, Master or Worker nodes depending by the current machine cluster role
object Main {
 
def main(args: Array[<span>String</span>]): Unit = {

WebServer.scala

  • This actor exposes two services:
    • /random (it generates 5 random graphs, submits the work to the Master, waits for the result and replies back to the HttpRequest)

    • /submit (it unmarshals the HttpRequest payload in a work instance, submits it to the Master, wait for the result and reply back to the HttpRequest)

Master.scala

  • It maintains a list of the current registered Workers to the cluster
  • It accepts works from the WebServer, split them in subworks and redistribute to the Workers. Once all Workers have given back the result (Map function) it performs the Reduce phase and gives back to the WebServer the result
  • It maintains the current state of the computation through a WorkState instance. So, for instance, it can resubmit a previous subwork due to a Worker computation failure

Worker.scala

  • Every node machine runs an instance of this actor. Once initialized it joins the cluster and notifies the Master
  • It waits for subworks from the Master, sends it to the WorkExecutor and gives back the Map function result

WorkExecutor.scala

  • This actor communicates with a Worker and execute the Map function. 

MasterWorkerProtocol.scala

  • Defines the communication protocol between Master and Workers

Work.scala

  • It describes and identifies the input of the graph intersection problem and the related subworks (partitions)

WorkState.scala

  • Describes the state of the computation and it is maintained (as a singleton) by the Master
  • It defines few data structures. 
    • worksPending: contains the Works received by the Master and not yet reduced
    • worksAccepted: contains the Works and only related unprocessed subworks 
    • worksInProgress: contains the Works and only related subworks submitted and still executing by the Workers
    • worksDone: contains the Works and only related subworks successfully executed by the Workers
    • worksReduced: contains the Works and their reduced result. Once a Work is reduced it is removed from worksPending

Running the code

In this section we execute the code in the Cluster in different scenarios. The following schema shows the machines involved and where the actors introduced above execute. Disclaimer: remember to change application.config and worker.config accordly to the machine addresses/hostnames in which you are running the code

Image 2

Empirical test 1 - A simple execution

Overview

After the setting up of the cluster is completed we call /submit with the graph above as the payload. Then we'll wait for the response (hopefully the arch (2,3) :-P)

Run the cluster

  1. Start Master in 192.168.1.30: copy the source code locally and run sbt 'run M1' in a bash
  2. Start WebServer in 192.168.1.30: copy the source code locally and run sbt 'run WS' in a bash
  3. Start a Worker/WorkExecutor in 192.168.1.53: copy the source code locally and run sbt run in a bash. 
  4. Perform the following POST call http://192.168.1.30:8080/submit with payload 
[["g1",[[1,2],[2,3]]],["g2",[[2,4],[2,3]]],["g3",[[2,3],[2,4],[4,2]]]
  1. Render the html returned by the service call. The last line shows the result and as we expected the intersection of the three graphs g1, g2 and g3 is [2,3]!

Conclusion

In this test we have familiarized with the problem and how we can submit an instance of the problem. Also is a good change to check out Spray implementation in order to consume/serve html/json contents over HTTP.

Empirical test 2 - Showing the load distribution above nodes

Run the cluster

  1. Start Master in 192.168.1.30: copy the source code locally and run sbt 'run M1' in a bash

  2. Start WebServer in 192.168.1.30: copy the source code locally and run sbt 'run WS' in a bash

  3. Start a Worker/WorkExecutor in 192.168.1.53: copy the source code locally and run sbt run in a bash. For the sack of the example uncomment the line to add a workload (few seconds) in the Map phase.

    package worker
    ...
    class WorkExecutor extends Actor {
    def receive = {
    ...
    simulateWorkload(80000)
  4. Scenario 1: Perform 15 /random service calls. This simulate 15 concurrent problem submission (5 random graphs of 10 arches each) to the cluster equipped with one slave node.

  5. Add a Worker/WorkExecutor to the cluster: in 192.168.1.54 copy the source code locally and run sbt run in a bash.

  6. Scenario 2: Perform 15 /random service calls. This simulate 15 concurrent problem submission (5 random graphs of 10 arches each) to the cluster equipped with 2 slaves node. 

  7. Add a Worker/WorkExecutor to the cluster: in 192.168.1.55 copy the source code locally and run sbt run in a bash.

  8. Scenario 3: Perform 15 /random service calls. This simulate 15 concurrent problem submission (5 random graphs of 10 arches each) to the cluster equipped with 3 slaves node. 

  9. Add a Worker/WorkExecutor to the cluster: in 192.168.1.56 copy the source code locally and run sbt run in a bash.

  10. Scenario 4: Perform 15 /random service calls. This simulate 15 concurrent problem submission (5 random graphs of 10 arches each) to the cluster equipped with 4 slaves node. 

The following chart resumes the results.

Image 3

Conclusion

This test shows how easy is adding (and removing) nodes to the cluster. Note that the Master is NOT aware about any physical location detail of any of the nodes. Pretty cool isn'it?!

Keen to discorver some other feature implemented? Try to shutdown a node while is computing or uncomment this to simulate a RuntimeException. The Master will be able to redistribute the computation failed calling other nodes!

package worker
...
class WorkExecutor extends Actor {
def receive = {
...
simulateRandomException()

Points of Interest

This programming experience let me to both discover some of emerging frameworks and languages and facing with the challenge of developing distributed algorithms. Here some considerations:

  • Choosing Scala the code is cleaner if compared to Java thanks to functional concepts like pattern matching e.g. the command receive code in an Actor:
    override def receiveCommand: Receive = {
    case MasterWorkerProtocol.RegisterWorker(workerId) =>
    and the manipulation of data structures through anonymos functions e.g.
val (mainW, rest) = worksAccepted.filter(w => w._2.contains(work)).head
  • The map reduce approach to parallelize computation is quite easy to understand and to model. Also, we can tune the parallelism factor acting in the partitioning phase before the Map (e.g. based by another criteria instead of parity) 
  • A great issue faced in developing and debugging a distribute system is to maintain the correctness after the a code modification. E.g. "I fixed that, is the Map phase still computing an intersection?". I found very useful the usage of assertions in the code for two reasons: you write the assertion when you are focused on that piece of code, in other words during the development and you are forced of thinking about what exactly you should expect in terms of pre-conditions and post-conditions splitting the responsibilities between the code. E.g.
case WorkCompleted(work, result) ⇒
...
require(subWCompleted.workId == work.workId, s"<span>WorkStarted expected workId ${work.workId} == ${subWCompleted.workId}"</span>)
require(!worksPending(mainW).filter(w => w == work).isEmpty, s<span>"</span>WorkId ${work.workId} was not expected to be ¬
in workPending")
require(!worksAccepted.contains(mainW) || worksAccepted(mainW).filter(w => w == work).isEmpty, "WorkId ${work.workId} was not expected ¬
to be in workIdsAccepted")
require(!worksDone.contains(mainW) || worksDone(mainW).filter(w => w == work).isEmpty, s<span>"</span>WorkId ${work.workId} was not expected to be ¬
in workIdsDone")

Future Improvements: just few ideas

Akka is a vast framework with many features not covered in this article. In this section I give a couple of ideas to improve scalability and robustness.

  • using Akka Persistence we can store in a non volatile memory the state of an Actor. For instance, persisting the state of the WebServer actor we can restore the unsent messages to the Master avoiding a loss of client requests in case of temporary fail over
  • using many Master and WebServer instances splitted in different machines we can achieve a more responsive, resilient and elastic system [2]

Have a good coding!

Download the code

Download akka-distributed-workers.tar.zip

References

[1] - http://doc.akka.io/docs/akka/2.4/scala.html

[2] - http://www.reactivemanifesto.org/

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)