Introduction
This project provides an API to describe, run and manage a custom workflow for the Amazon Elastic Map/Reduce Service.
With this API you can:
- start and configure new EMR jobs;
- add steps to a running job;
- rerun steps if required;
- control and change the sequence of the steps at runtime;
- control the lifetime of the cluster.
Moreover, it allows you to store the job flow’s template in xml-files, similar to the oozie-workflow.
Also, it supports placeholders which are resolved at runtime.
This solution is a .NET assembly written on Microsoft .NET Framework 4.5. It includes three projects:
- an AwsEmrWorkflow.dll which exposes an API mentioned above;
- a unit tests’ project for the API;
- a demo project for the API.
You can always download the latest version with fixes from my website:
supperslonic.com
In order to build and run it, you need to install the AWS SDK for .NET
Data model
For the description of an EMR Job flow an API provides:
- Object-model – a collection of classes that describes an EMR Job;
- Xsd schema for the XML-model.
They are both interchangeable and easily extendable.
Object-model can be serialized into the xml-model and xml-model can be deserialized into the object-model.
You can also use the hybrid of two by building you final object like “Lego” from different pieces: xml files or objects.
An example of the job flow in an xml-file:
="1.0"="utf-16"
<jobFlow xmlns="urn:supperslonic:emrWorkflow">
<name>Job1-by-{userName}</name>
<logUri>{myBucket}/logs</logUri>
<ec2KeyName>testEC2Key</ec2KeyName>
<amiVersion>3.0.3</amiVersion>
<hadoopVersion>2.2.0</hadoopVersion>
<masterInstanceType>m1.medium</masterInstanceType>
<slaveInstanceType>m3.2xlarge</slaveInstanceType>
<instanceCount>34</instanceCount>
<config>
<hadoopConfig>
<arg>-s</arg>
<arg>mapreduce.user.classpath.first=true</arg>
</hadoopConfig>
<hBaseConfig start="true">
<jar>/home/hadoop/lib/hbase-0.94.7.jar</jar>
<arg>--site-config-file</arg>
<arg>{myBucket}/hBase/config.xml</arg>
<hBaseDaemondsConfig>
<arg>--hbase-master-opts=-Xmx6140M -XX:NewSize=64m</arg>
<arg>--regionserver-opts=-XX:MaxNewSize=64m -XX:+HeapDumpOnOutOfMemoryError</arg>
</hBaseDaemondsConfig>
</hBaseConfig>
</config>
<bootstrapActions>
<bootstrapAction>
<name>bootstrap action 1</name>
<path>{myBucket}/bootstrap/UploadLibraries.sh</path>
</bootstrapAction>
</bootstrapActions>
<steps>
<restoreHBase path="{myBucket}/hBaseRestore" />
<jarStep>
<name>step 1</name>
<jar>{myBucket}/jars/test.jar</jar>
<actionOnFailure>CANCEL_AND_WAIT</actionOnFailure>
<mainClass>com.supperslonic.emr.Step1Driver</mainClass>
<arg>true</arg>
<arg>12.34</arg>
</jarStep>
<backupHBase path="{myBucket}/hBaseBackup" />
<jarStep>
<name>step 2</name>
<jar>{myBucket}/jars/test2.jar</jar>
</jarStep>
</steps>
</jobFlow>
Model contains the following sections:
Job flow description where you can set job's name, log's location, instances' types, tags etc.
Configuration description where you can configure your Hadoop, HBase and Debug settings.
The following configurations are supported:
- Hadoop configuration;
- HBase configuration;
- HBase Daemons configuration;
- Debug configuration.
Bootstrap actions where you can specify any number of custom bootstrap actions.
Jar steps where you can specify any number of steps. The following types are supported:
- Custom jar step;
- Restore HBase step;
- Backup HBase step;
Placeholders
Model also supports any custom placeholders that are resolved during the runtime.
In order to do this, a user must define a placeholders’ replacement by populating the instance of the class BuilderSettings
which is used during a building stage of any EMR Service request.
There are two reserved settings:
- jobFlowId – used to identify the current job.
Populated either automatically during the job start or manually by a user.
- hBaseJarPath – used to specify the current HBase version.
Populated either automatically from the HBase configuration information or manually by a user.
API structure
Runner – operates on the list of strategies provided by the user and verifies the current status of the job.
Strategy – sends a specific request to the Amazon EMR Service.
Builder – builds a specific request for the Amazon EMR Service based on the object-model provided.
Visitor – the main layer of abstraction between EMR data-model and API’s data-model. Visitor separates an algorithm of building a specific request from the API’s object-model’s structure.
Serialization implementation
All the serialization support is implemented in the base class EmrWorkflowItemBase
which exposes some virtual methods for concrete implementation according to the requirements of the specific object.
There are also several xml factories that can serialize/deserialize collections of objects:
StepsXmlFactory
; ConfigsXmlFactory
; BootstrapActionsXmlFactory
; TagsXmlFactory
.
Visitor implementation
A visitor design pattern was selected to process API’s data-model structure. That allows a full decoupling of any processing algorithm from the objects’ structure and allows an effortless extension if required. Visitor implements an interface IEmrWorkflowItemVisitor
which is accepted by each object in the API’s data-model.
BuildRequestVisitor
class is a concrete implementation of the IEmrWorkflowItemVisitor
interface which creates parts of the EMR Service request based on the visited data. It is entirely decoupled from the request-building process. It just notifies the observers by raising a particular event that some part of the request was created.
The following events are supported:
- OnRunJobFlowRequestCreated;
- OnJobFlowInstancesConfigCreated;
- OnTagCreated;
- OnBootstrapActionConfigCreated;
- OnStepConfigCreated.
BuildRequestVisitor
class is also responsible for the placeholders’ replacement while creating the EMR Service request.
Builder implementation
Builder is responsible for building the final EMR Service request. Internally, it uses a visitor to visit the provided object’s structure. It is subscribed to the visitor’s events to build the final EMR Service request.
API contains two builders:
RunJobFlowRequestBuilder
– builds a request to start and configure an EMR job based on the JobFlow
-class instance; AddJobFlowStepsRequestBuilder
– builds a request to add new steps to a running job based on the list of StepBase
-class instances.
Strategy implementation
To send different types of requests to the EMR Service a strategy design pattern is used. The strategy EmrActivityStrategy
hides behind the algorithm of building and sending the specific request for the EMR.
This approach allows splitting the job flow into logical pieces (activities) that gives you more control over the job flow’s sequence and its behavior. No matter what type of the activity is used, all of them are treated uniformly and are interchangeable. So the user can focus more on the design part of the workflow rather than on implementation of it.
public class StartJobStrategy : EmrActivityStrategy
{
private JobFlow jobFlow;
public StartJobStrategy(string name, JobFlow jobFlow)
: base(name)
{
this.jobFlow = jobFlow;
}
public override async Task<bool> PushAsync(EmrJobRunner emrJobRunner)
{
RunJobFlowRequestBuilder builder = new RunJobFlowRequestBuilder(emrJobRunner.Settings);
RunJobFlowRequest request = builder.Build(this.jobFlow);
RunJobFlowResponse response = await emrJobRunner.EmrClient.RunJobFlowAsync(request);
if (!this.IsOk(response))
return false;
emrJobRunner.JobFlowId = response.JobFlowId;
return true;
}
}
Inside each EmrActivityStrategy
all the requests are sent using a task-based asynchronous pattern. This allows a thread not to be blocked and immediately returned to the tread-pool to pick up a new pending work.
To learn more about the task-based asynchronous pattern, please see Task-based Asynchronous Pattern (TAP) on MSDN.
Runner implementation
EmrJobRunner
class is responsible for orchestrating the emr-activities and checking the status of a job.
To iterate through the list of the activities it is using an abstract class EmrActivitiesEnumerator
. A user should implement two methods of this class:
- GetNormalFlow – method returns a list of activities to be executed in a normal flow;
- GetFailedFlow – method is optional. Returns an alternative list of activities to be executed if an error has occurred.
The list of activities can be either a predefined sequence of activities or any complex logic of selecting a next activity.
A switch to the failed flow happens automatically when EmrJobRunner
notifies the iterator that there was an error, but it is still up to a user to decide what to do: run an alternative sequence of activities or just stop iterating and terminate the job.
public class DemoEmrActivitiesEnumerator : EmrActivitiesEnumerator
{
protected override IEnumerable<EmrActivityStrategy> GetNormalFlow(EmrJobRunner emrRunner)
{
if (String.IsNullOrEmpty(emrRunner.JobFlowId))
yield return this.CreateStartActivity();
yield return this.CreateAddStepsActivity();
yield return new TerminateJobStrategy("Job succeeded. terminate cluster");
}
protected override IEnumerable<emractivitystrategy> GetFailedFlow(EmrJobRunner emrRunner)
{
yield return new TerminateJobStrategy("Job failed. terminate cluster");
}
private EmrActivityStrategy CreateStartActivity()
{
XmlDocument jobFlowXml = new XmlDocument();
jobFlowXml.Load("Workflow/JobConfig.xml");
JobFlow jobFlow = JobFlow.GetRecord(jobFlowXml.OuterXml);
return new StartJobStrategy("start and configure job", jobFlow);
}
private EmrActivityStrategy CreateAddStepsActivity()
{
XmlDocument stepsXml = new XmlDocument();
stepsXml.Load("Workflow/Steps.xml");
IList<stepbase> steps = new StepsXmlFactory().ReadXml(stepsXml.OuterXml);
return new AddStepsStrategy("first activity", steps);
}
}
Internally, EmrJobRunner
is using a System.Threading.Timer to call the CheckStatus method for checking the job’s status and pushing new activities if required. This method is thread-safe: only one thread at a time is guaranteed to be executing it. There is a primitive user-mode synchronization construction at the entrance of the method which simply rejects other threads until the current call is done.
The timer is done on purpose to avoid a construction like "... while(checkStatus) thread.sleep ..."
Thread.sleep is good for demo purposes when you don’t care about resources but not for the architectural design solution. Because even if the timesheduler doesn’t give your thread a CPU time it is still not efficient: instead of doing some other work you put your thread to “sleep” and you force the thread-pool to create new working threads which at some moment can result into many running threads with constant context switching.
How to use
To use the API user should do two things:
- Define a sequence of activities by implementing an abstract class
EmrActivitiesEnumerator
; - Call
EmrJobRunner
to run the activities.
public class Program
{
public static void Main(string[] args)
{
BuilderSettings settings = Program.CreateSettings();
AmazonElasticMapReduceClient emrClient = Program.CreateEmrClient();
DemoEmrActivitiesEnumerator activitiesIterator = new DemoEmrActivitiesEnumerator();
using (EmrJobRunner emrRunner = new EmrJobRunner(settings, emrClient, activitiesIterator))
{
emrRunner.Run();
while (emrRunner.IsRunning)
{
Thread.Sleep(5000);
}
}
}
public static BuilderSettings CreateSettings()
{
BuilderSettings settings = new BuilderSettings();
settings.Put("s3Bucket", "s3://myBucket/emr");
return settings;
}
public static AmazonElasticMapReduceClient CreateEmrClient()
{
String accessKey = "";
String secretKey = "";
return new AmazonElasticMapReduceClient(accessKey, secretKey, RegionEndpoint.USEast1);
}
}