Introduction
This is a console application that demonstrates a real example of how we implemented IObservable<T>
and IObserver<T>
.
Steps to Run the Solution
- In the
ObserverPattern
attachment, there is a CustomerList.txt file. Stick that into your C:\ location. If you decide to place it somewhere else, please update the App.config in the Provider
project. - Run the
FakeElasticSearch
Web API.
What is the FakeElasticSearch?
As we don't have access to the real Elastic Search link from here, in order to make this work, I needed to develop a FakeElasticSearch
API, which you need to run. The Provider
will start posting requests to that link, for that reason it's important to run FakeElasticSearch
API. The Observer will start posting GET
Requests to the FakeElasticSerach
in order to retrieve the date.
I have enabled the Nuget Package Restore, so nuget manager will be able to find and add missing assemblies.
At the end of the process, a .tsv will be generated in the specified location. Again, if you need to change that, please update the App.Config.
- Run the
ObserverPattern
solution.
The Scenario
We were posting events to the EventHub
by using WebApi
. We wanted to measure the time it took for the message to be added and retrieved from the EventHub
. To retrieve the time, we needed to Post to Elastic Search by adding to the querystring, the Token that we initially posted through the WebApi
.
We needed to automate this flow by triggering thousands of requests, and at the completion, we needed to export data to a TSV file.
The file would include Three Columns (Token- RequestDateStamp- EventDateStamp)
I could have done it using two console applications. One would be responsible to start posting to the Web Api and the second console application would be responsible to read from The Elastic Search using the token.
That could be a solution but .NET Framework is shifted with a number of interfaces that can help us deliver solutions with more elegant and vigilent code.
I decided to implement IObserver
and IObservable
. For this scenario, I felt that it was the perfect solution.
Theory
As explained on MSDN at https://msdn.microsoft.com/en-us/library/dd990377%28v=vs.110%29.aspx,
The IObserver<T>
and IObservable<T>
interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T>
interface represents the class that sends notifications (the provider); the IObserver<T>
interface represents the class that receives them (the observer). T
represents the class that provides the notification information. In some push-based notifications, the IObserver<T>
implementation and T
can represent the same type.
The provider sends notifications to the observer, but before it can do that, the Observer needs subscribe, to the provider, to indicate that it wants to receive push-based notifications.
So basically, we have:
IObservable(Provider)
IObserver (Observer)
From now and on, we will be referring to those two as Provider- Observer.
The observer exposes the following methods:
The provider must implement a single method, Subscribe
, that indicates that an observer wants to receive push-based notifications.
Basically, I needed a Provider
that would be responsible to post requests to the API .The Provider
would forward the notification to the Observer
. The observer
, in its turn, would query the Elastic Search, retrieve the EventDate
, deserializing the requent and ultimately exporting the data to a TSV file.
Using the Code
public class CustomersApiClient : IObservable<Request>
{
private IList<IObserver<Request>> observers;
public CustomersApiClient()
{
observers = new List<IObserver<Request>>();
}
public IDisposable Subscribe(IObserver<Request> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
private IList<IObserver<Request>> _observers;
private IObserver<Request> _observer;
public Unsubscriber
(IList<IObserver<Request>> observers, IObserver<Request> observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}
public void SendRequest(IList<Customer> customers)
{
var requests = new List<Request>();
PostRequest(customers, ref requests);
ProcessRequests(requests);
}
private void PostRequest(IList<Customer> customers, ref List<Request> requests)
{
foreach (var customer in customers)
{
var client = new HttpClient();
var customerApiBaseAddress =
ConfigurationManager.AppSettings["CustomerApiBaseAddress"];
var customerApiPostAddress =
ConfigurationManager.AppSettings["CustomerApiBasePostAddress"];
client.BaseAddress = new Uri(customerApiBaseAddress);
requests.Add(new Request()
{
CustomerId = customer.CustomerId,
RequestTimeStamp = DateTimeOffset.UtcNow.ToString("o")
});
client.PostAsync(customerApiPostAddress, null);
}
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("CustomerApiClient-Provider has posted all the requests!");
Thread.Sleep(TimeSpan.FromSeconds(3));
}
private void ProcessRequests(IList<Request> requests)
{
var observer = observers.First();
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine
("FakeElasticSearchClient-Observer has been notified for the coming requests.");
Console.WriteLine();
Console.WriteLine
("FakeElasticSearchClient-Observer is processing requests to produce valid responses..");
var task = new Task(() => requests.ToList().ForEach(ProcessRequest));
task.Start();
task.ContinueWith((con) =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("FakeElasticSearchClient-Observer will start exporting to TSV file!");
observer.OnCompleted();
});
}
private void ProcessRequest(Request request)
{
var observer = observers.First();
observer.OnNext(request);
}
}
And the Observer
:
The observer will be notified, by calling OnNext
in the Provider
. Then, it will start posting GET
requests to the FakeElasticSearch
endpoint. On its completion, it will generate a .tsv file.
public class ElasticSearchClient : IObserver<Request>
{
private IList<Response> _responses { get; set; }
public ElasticSearchClient()
{
_responses = new List<Response>();
}
public void OnCompleted()
{
Console.BackgroundColor = ConsoleColor.DarkGreen;
Console.WriteLine("Thread ID: {0}", Thread.CurrentThread.ManagedThreadId);
var exportToFile = ConfigurationManager.AppSettings["Export"];
using (var writer = new StreamWriter(exportToFile))
{
_responses.ToList().ForEach((item) => writer.WriteLine(String.Join("\t", new[] {
item.CustomerId, item.RequestTimeStamp, item.ElasticSearchTimeStamp })));
writer.Close();
}
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("ElasticSearchClient-Observer has finished exporting.");
Console.ForegroundColor = ConsoleColor.Cyan;
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnNext(Request request)
{
GetRequest(request);
}
private void GetRequest(Request request)
{
var httpClient = new HttpClient();
var elasticSearchAddress = ConfigurationManager.AppSettings["ElasticSearchAddress"];
var uri = string.Format(elasticSearchAddress, request.CustomerId);
var response = httpClient.GetAsync(uri).Result;
var content = response.Content.ReadAsStringAsync().Result;
dynamic results = JArray.Parse(content);
var elasticSearchTimeStamp= GetDateFromResponse(results);
_responses.Add(new Response()
{
RequestTimeStamp = request.RequestTimeStamp,
CustomerId= request.CustomerId,
ElasticSearchTimeStamp = elasticSearchTimeStamp
});
}
private string GetDateFromResponse(dynamic dynamicObject)
{
var request = dynamicObject[0];
var date = request["_eventDate"];
return date.ToString("o");
}
}
So what just happened, after we posted and processed all the requests, we are ready to call Observer's OnCompleted
method. At this stage, OnCompleted
will be responsible to procure the export.
Another interesting feature is the following:
private void ProcessRequests(IList<Request> requests)
{
var observer = observers.First();
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("FakeElasticSearchClient-Observer has been notified
for the coming requests.");
Console.WriteLine();
Console.WriteLine("FakeElasticSearchClient-Observer is processing requests
to produce valid responses..");
var task = new Task(() => requests.ToList().ForEach(ProcessRequest));
task.Start();
task.ContinueWith((con) =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("FakeElasticSearchClient-Observer will start exporting to TSV file!");
observer.OnCompleted();
});
}
We could have posted thousands of requests. For each request, we need to do a number of operations:
- We need to post to the
FakeElasticSearch
Link. In the real time scenario, Elastic Search takes some minutes until records are available, so we might have a waiting time. - We need to deserialize the response.
If you try with Big Data, we could use Tasks, to make this process faster. That will be fine as long as we place OnCompleted
inside a Continuation. This will indicate that the OnCompleted
will execute AFTER all the tasks have been completed. Therefore, the execution flow between Provider
and Observer
won't be disrupted.
To make this interesting and see what happens on multiple threads, I could generate big data file, and see what happens, while Provider
sends batches, where each batch is constituted by 15000 requests. At this point, you can see how multiple threads make the whole process faster.
Points of Interest
Tasks and Continuations.
The combination of design pattern and multithreading, under the right context, can transform our apps to high performance systems.