Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / HTML

Observer(C#) Pattern with IObservable and IObserver Interfaces

4.89/5 (6 votes)
9 Feb 2015CPOL4 min read 54.4K   705  
Real example that shows how to implement Observer pattern using IObservable<T> and IObserver<T>

Image 1

Introduction

This is a console application that demonstrates a real example of how we implemented IObservable<T> and IObserver<T>.

Steps to Run the Solution

  1. In the ObserverPattern attachment, there is a CustomerList.txt file. Stick that into your C:\ location. If you decide to place it somewhere else, please update the App.config in the Provider project.
  2. Run the FakeElasticSearch Web API.

    What is the FakeElasticSearch?

    As we don't have access to the real Elastic Search link from here, in order to make this work, I needed to develop a FakeElasticSearch API, which you need to run. The Provider will start posting requests to that link, for that reason it's important to run FakeElasticSearch API. The Observer will start posting GET Requests to the FakeElasticSerach in order to retrieve the date.

    I have enabled the Nuget Package Restore, so nuget manager will be able to find and add missing assemblies.

    At the end of the process, a .tsv will be generated in the specified location. Again, if you need to change that, please update the App.Config.

  3. Run the ObserverPattern solution.

The Scenario

We were posting events to the EventHub by using WebApi. We wanted to measure the time it took for the message to be added and retrieved from the EventHub. To retrieve the time, we needed to Post to Elastic Search by adding to the querystring, the Token that we initially posted through the WebApi.

We needed to automate this flow by triggering thousands of requests, and at the completion, we needed to export data to a TSV file.

The file would include Three Columns (Token- RequestDateStamp- EventDateStamp)

I could have done it using two console applications. One would be responsible to start posting to the Web Api and the second console application would be responsible to read from The Elastic Search using the token.

That could be a solution but .NET Framework is shifted with a number of interfaces that can help us deliver solutions with more elegant and vigilent code.

I decided to implement IObserver and IObservable. For this scenario, I felt that it was the perfect solution.

Theory

As explained on MSDN at https://msdn.microsoft.com/en-us/library/dd990377%28v=vs.110%29.aspx,

The IObserver<T> and IObservable<T> interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T> interface represents the class that sends notifications (the provider); the IObserver<T> interface represents the class that receives them (the observer). T represents the class that provides the notification information. In some push-based notifications, the IObserver<T> implementation and T can represent the same type.

The provider sends notifications to the observer, but before it can do that, the Observer needs subscribe, to the provider, to indicate that it wants to receive push-based notifications.

So basically, we have:

  1. IObservable(Provider)
  2. IObserver (Observer)

From now and on, we will be referring to those two as Provider- Observer.

The observer exposes the following methods:

The provider must implement a single method, Subscribe, that indicates that an observer wants to receive push-based notifications.

Basically, I needed a Provider that would be responsible to post requests to the API .The Provider would forward the notification to the Observer. The observer, in its turn, would query the Elastic Search, retrieve the EventDate, deserializing the requent and ultimately exporting the data to a TSV file.

Using the Code

C#
public class CustomersApiClient : IObservable<Request>
{
        private IList<IObserver<Request>> observers;

        public CustomersApiClient()
        {
            observers = new List<IObserver<Request>>();
        }

        public IDisposable Subscribe(IObserver<Request> observer)
        {
            if (!observers.Contains(observer))
                observers.Add(observer);
            return new Unsubscriber(observers, observer);
        }

        private class Unsubscriber : IDisposable
        {
            private IList<IObserver<Request>> _observers;
            private IObserver<Request> _observer;

            public Unsubscriber
            (IList<IObserver<Request>> observers, IObserver<Request> observer)
            {
                _observers = observers;
                _observer = observer;
            }

            public void Dispose()
            {
                if (_observer != null && _observers.Contains(_observer))
                    _observers.Remove(_observer);
            }
        }

        public void SendRequest(IList<Customer> customers)
        {
            var requests = new List<Request>();
            PostRequest(customers, ref requests);
            ProcessRequests(requests);
        }

        private void PostRequest(IList<Customer> customers, ref List<Request> requests)
        {

            foreach (var customer in customers)
            {
                var client = new HttpClient();
                var customerApiBaseAddress = 
                ConfigurationManager.AppSettings["CustomerApiBaseAddress"];
                var customerApiPostAddress = 
                ConfigurationManager.AppSettings["CustomerApiBasePostAddress"];
                client.BaseAddress = new Uri(customerApiBaseAddress);
                requests.Add(new Request()
                {
                    CustomerId = customer.CustomerId,
                    RequestTimeStamp = DateTimeOffset.UtcNow.ToString("o")
                });
                client.PostAsync(customerApiPostAddress, null);
            }
           
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("CustomerApiClient-Provider has posted all the requests!");
            Thread.Sleep(TimeSpan.FromSeconds(3));
        }

        private void ProcessRequests(IList<Request> requests)
        {
            var observer = observers.First();

            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine
            ("FakeElasticSearchClient-Observer has been notified for the coming requests.");
            Console.WriteLine();
            Console.WriteLine
            ("FakeElasticSearchClient-Observer is processing requests to produce valid responses..");

            var task = new Task(() => requests.ToList().ForEach(ProcessRequest));

            task.Start();

            task.ContinueWith((con) =>
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.WriteLine("FakeElasticSearchClient-Observer will start exporting to TSV file!");
                observer.OnCompleted();
            });
        }

        private void ProcessRequest(Request request)
        {
            var observer = observers.First();
            observer.OnNext(request);
        }
 }

And the Observer:

The observer will be notified, by calling OnNext in the Provider. Then, it will start posting GET requests to the FakeElasticSearch endpoint. On its completion, it will generate a .tsv file.

C#
public class ElasticSearchClient : IObserver<Request>
{
       private IList<Response> _responses { get; set; }

       public ElasticSearchClient()
       {
           _responses = new List<Response>();
       }

       public void OnCompleted()
       {
           Console.BackgroundColor = ConsoleColor.DarkGreen;
           Console.WriteLine("Thread ID: {0}", Thread.CurrentThread.ManagedThreadId);
           var exportToFile = ConfigurationManager.AppSettings["Export"];

           using (var writer = new StreamWriter(exportToFile))
           {
               _responses.ToList().ForEach((item) => writer.WriteLine(String.Join("\t", new[] {
                    item.CustomerId, item.RequestTimeStamp, item.ElasticSearchTimeStamp })));
               writer.Close();
           }
           Console.ForegroundColor = ConsoleColor.Yellow;
           Console.WriteLine("ElasticSearchClient-Observer has finished exporting.");
           Console.ForegroundColor = ConsoleColor.Cyan;
       }

       public void OnError(Exception error)
       {
           throw new NotImplementedException();
       }

       public void OnNext(Request request)
       {
           GetRequest(request);
       }

       private void GetRequest(Request request)
       {
           var httpClient = new HttpClient();

           var elasticSearchAddress = ConfigurationManager.AppSettings["ElasticSearchAddress"];

           var uri = string.Format(elasticSearchAddress, request.CustomerId);

           var response = httpClient.GetAsync(uri).Result;

           var content = response.Content.ReadAsStringAsync().Result;

           dynamic results = JArray.Parse(content);

           var elasticSearchTimeStamp= GetDateFromResponse(results);

           _responses.Add(new Response()
            {
                RequestTimeStamp = request.RequestTimeStamp,
                CustomerId= request.CustomerId,
                ElasticSearchTimeStamp = elasticSearchTimeStamp
           });
       }

       private string GetDateFromResponse(dynamic dynamicObject)
       {
           var request = dynamicObject[0];
           var date = request["_eventDate"];
           return date.ToString("o");
       }
 }

So what just happened, after we posted and processed all the requests, we are ready to call Observer's OnCompleted method. At this stage, OnCompleted will be responsible to procure the export.

Another interesting feature is the following:

C#
private void ProcessRequests(IList<Request> requests)
        {
            var observer = observers.First();

            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine("FakeElasticSearchClient-Observer has been notified 
                for the coming requests.");
            Console.WriteLine();
            Console.WriteLine("FakeElasticSearchClient-Observer is processing requests 
                to produce valid responses..");

            var task = new Task(() => requests.ToList().ForEach(ProcessRequest));

            task.Start();

            task.ContinueWith((con) =>
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.WriteLine("FakeElasticSearchClient-Observer will start exporting to TSV file!");
                observer.OnCompleted();
            });
        }

We could have posted thousands of requests. For each request, we need to do a number of operations:

  1. We need to post to the FakeElasticSearch Link. In the real time scenario, Elastic Search takes some minutes until records are available, so we might have a waiting time.
  2. We need to deserialize the response.

If you try with Big Data, we could use Tasks, to make this process faster. That will be fine as long as we place OnCompleted inside a Continuation. This will indicate that the OnCompleted will execute AFTER all the tasks have been completed. Therefore, the execution flow between Provider and Observer won't be disrupted.

To make this interesting and see what happens on multiple threads, I could generate big data file, and see what happens, while Provider sends batches, where each batch is constituted by 15000 requests. At this point, you can see how multiple threads make the whole process faster.

Points of Interest

Tasks and Continuations.

The combination of design pattern and multithreading, under the right context, can transform our apps to high performance systems.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)