Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / HTML

Practical Rx Simplyfying Concurrency, Scheduled Multi-Value handling and Unit Testing

5.00/5 (1 vote)
8 Jul 2015LGPL313 min read 11.8K   62  
In this article we use a real life scenario which involves observing intra-day trades.

Prelude

In the previous article on this series I used a WPF application, to demonstrate RX’s inbuilt ability for abstraction of concurrency. This facilitates simplification of unit-testing of MVVM concurrency scenarios. In this article we use a real life scenario which involves observing intra-day trades. We will use this real scenario to dig more deeper into the nuts and bolts of RX and analyse how Rx simplifies handling complex concurrent scenarios, with very little code while at the same time providing granular flexibilty of handling concurency, errors and unit testing abilities. We will leverage RX testing framework’s flexibility to test on the virtual time dimension. This is an absolute necessary element because invariably realtime data sequence or streaming time series entails a time dimension to it. Along the way we will see in practice the below features of Rx.

Functional

Where applicable avoid intricate stateful programs, using clean input/output functions over observable streams.

Less is more

ReactiveX’s operators often reduce what was once an elaborate challenge into a few lines of code.

Async error handling

Traditional try/catch is powerless for errors in asynchronous computations, but ReactiveX is equiped with proper mechanisms for handling errors.

Concurrency made easy

Observables and Schedulers in ReactiveX allow the programmer to abstract away low-level threading, synchronization, and concurrency issues.

Scenario

The power traders require an intra-day report to give them their day ahead power position. The report should output the aggregated volume per hour to a CSV file.

Sl. Requirements
1 Must be implemented as a Windows service using .Net 4.5 & C#.
2 All trade positions must be aggregated per hour (local/wall clock time). Note that for a given,day, the actual local start time of the day is 23:00 (11 pm) on the previous day. Local time is,in the GMT time zone.
3 CSV output format must be two columns, Local Time (format 24 hour HH:MM e.g. 13:00),and Volume and the first row must be a header row.
4 CSV filename must be PowerPosition_YYYYMMDD_HHMM.csv where YYYYMMDD is,year/month/day e.g. 20141220 for 20 Dec 2014 and HHMM is 24hr time hour and minutes,e.g. 1837. The date and time are the local time of extract.
5 The location of the CSV file should be stored and read from the application configuration file.
6 An extract must run at a scheduled time interval; every X minutes where the actual interval,X is stored in the application configuration file. This extract does not have to run exactly on,the minute and can be within +/- 1 minute of the configured interval.
7 It is not acceptable to miss a scheduled extract.
8 An extract must run when the service first starts and then run at the interval specified as,above.
9 It is acceptable for the service to only read the configuration when first starting and it does,not have to dynamically update if the configuration file changes. It is sufficient to require a,service restart when updating the configuration.
10 The service must provide adequate logging for production support to diagnose any issues.
11 This should handle Daylight Saving Start and End

Additional Notes

An assembly has been provided (PowerService.dll) that must be used to interface with the "trading system". A single interface is provided to retrieve power trades for a specified date. Two methods are provided, one is a synchronous implementation (IEnumerable<TradeAdapter>) and the other is asynchronous (Task<IEnumerable<TradeAdapter>>). The implementation has to use the async method. The classPowerService is the actual implementation of this service. The date argument should be the date to retrieve the power position (volume) for. The PowerTrade class contains an array of PowerPeriods for the given day. The period number starts at 1, which is the first period of the day and starts at 23:00 (11 pm) on the previous day.

Example

Given the call to PowerService.GetTrades returns the following two trade positions:

T1 => [ P1{1,100} , P2{2,100}, P3{3,100}, P4{4,100} .. P23{23,100},P24{24,100}]
T2 => [ P1{1,-50} , P2{2,-20}, P3{3,-50}, P4{4,-50} .. P23{23,100},P24{24,10}]

Expected Output

Local Time Volume
23:00 50
00:00 80
01:00 50
02:00 50
…. ….
21:00 200
22:00 110

Conceptual Analysis and Design

We can conceptualise the workflow as an infinite sequence of Timer events Tm which transforms to Observable collection of Trades.

During the transformation process there could be exceptions as shown as Error. In such situations we will need to Retry the source such that the Timer event sequence continues instead of dying out with an Error. This is necessary by design in this scenario because the specification 7 says that it is not acceptable to miss a scheduled extract.

Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k] }–>Completed
Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed
Tm–>{ }–>Error—>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k] }–>Completed
Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed
Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed
Tm ….

All the above specifications using the conceptual design above can be implemented in as simple as 60 lines of formatted code below. Additionally the complexity of Error handling , scheduling , multi value handling , transformation and unit testing of concurrency over virtual time dimension can be easily achieved as discussed in the following sections.

C#
public void Run(IPowerService svc, IScheduler scheduler, DateTime dtrunDate, TimeZoneInfo timeZoneInfo,
 int observationIntervalInMinutes, StringBuilder sbpowerpositionLines, string csvFilePath, StreamMode streamMode = StreamMode.StreamToFile)
{
    var dateTimeHelper = new DateTimeHelper(dtrunDate,timeZoneInfo);
    _reporterDisposable = Observable.Interval(TimeSpan.FromMinutes(observationIntervalInMinutes), scheduler)
        .Select(i =>
        {
            dtrunDate = dtrunDate.AddMinutes((i + 1) * observationIntervalInMinutes);
            return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
        })
        .Subscribe(m =>
        {
            sbpowerpositionLines.Clear();
            sbpowerpositionLines.AppendLine("Local Time,Volume");
            m.Catch((PowerServiceException ex) =>
            {
                Log.Error(string.Format("PowerServiceException  {0}", ex.Message));
                return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
            })
            .Retry()
            .SelectMany(t => t.SelectMany(x => x.Periods))
            .GroupBy(g => g.Period)
            .Select(p => new {Period = p.Key, Volume = p.Sum(_ => _.Volume)})
            .Subscribe(value =>
            {
                value.Volume.Subscribe(volume =>
                {
                    sbpowerpositionLines.AppendLine(string.Format("{0},{1}",
                        dateTimeHelper.Reset(dtrunDate, timeZoneInfo).UtcIndexToLocalHourMap[value.Period], volume));
                    Log.Info(string.Format("Period {0}, Volume {1}",
                        value.Period,
                        volume));
                });
            }, delegate
            {
                sbpowerpositionLines.AppendLine("OnError");
                Log.Error("OnError");
            }, 
            async () =>
            {
                if (streamMode == StreamMode.StreamToMemory) return;
                string path = Path.Combine(csvFilePath,
                    "PowerPosition" + dtrunDate.ToString("_yyyyMMdd_") + DateTime.Now.ToString("HHmm") +
                    ".csv");
                if (Directory.Exists(csvFilePath))
                {
                    using (var stream = new StreamWriter(path))
                    {
                        await stream.WriteAsync(sbpowerpositionLines.ToString());
                        await stream.FlushAsync();
                    }
                    Log.Info("Completed " + path + "\n");
                }
                else
                {
                    Log.Error("Completed but Path " + path + " do not exist !!\n");
                }
            });
        });
}

The code above may appear a bit daunting. However if you are familiar with the Rx grammer , then actually it is not.

Instead of an intricate stateful workflow , we can relate the above scenario functionally such as we have a infinite set of timer events input representing schdules which transforms into output at the time schedules as specified above and persisted into CSV files. The other matters like scheduling , async calls, error handling are all available out of box with Rx.

All that we need to ensure in our design is that the workflow conforms with the Rx grammar. An Rx sequence follows up with zero or more OnNext and then ends up in either OnError or OnCompleted. Grammatically this can be represented as

sequence => {OnNext}{OnError || OnCompleted}

Transformation

At the first stage we created a sequence of observable timer events. We will observe on these timer events to simulate the periodic schedules which are configurable as per specification 6.

C#
Observable.Interval(TimeSpan.FromMinutes(observationIntervalInMinutes), scheduler)

Next what we want is to Transform or project the timer schedule events into Observable list of Trades. The fact that the projection involves an async call and associated exception handling; is not much of a concern because unlike traditional try/catch (which cannot handle async exceptions) Rx is equipped with proper mechanism to handle async exceptions. As shown below Observable.FromAsync will neatly take care of converting the result into an observable sequence.

You will also notice that the time is rolled over by the offset of the time Interval sequence multipled with schedule interval in minutes.

C#
.Select(i =>
    {
        dtrunDate = dtrunDate.AddMinutes((i + 1) * observationIntervalInMinutes);
        return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
    })

The resultant trades are projected into the OnNext in Subscribe. It is to be noted that the transformed collection of trades are themselves Observable and any exceptions emanating from the source "Observable.FromAsync" will be observable through the trades observable "m" which is input to the Outer Timer sequence Subscribe.The exception is intercepted and we can continue to proceed to the next sequence value , by normally returning an Observable.Empty. We instead make another call to "Observable.FromAsync", so as get trades incase of error or exception with the previous call. It is to be also noted that we apply a .Retry() which will infinitely try the source for successful list of Trades incase of previous error or exception

m.Catch((PowerServiceException ex) =>
    {
    Log.Error(string.Format("PowerServiceException  {0}", ex.Message));
    return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
})
.Retry()

As you can see below we use .SelectMany to further flatten the projection and then group the sequence of trade collections by individual period and then summing up the volume specific to each period. This is very similar to Linq to Objects , but there is a subtle difference.

Using Rx Linq Operators we are dealing with streams, while Linq Object to memory will lay out the whole set in memory. This difference will not be quite so obvious in this example, however there are cases where you are dealing with a huge set to memory say parsing a 100 gb file , this difference will become quite obvious with the side effects reflect onto the resource monitor.

You further notice that we apply yet another .Select to project the grouped periods into sequence of period and sum of all volumes in that period {P,∑v}

C#
.SelectMany(t => t.SelectMany(x => x.Periods))
.GroupBy(g => g.Period)
.Select(p => new {Period = p.Key, Volume = p.Sum(_ => _.Volume)})

The notable thing about applying Rx Linq Operators and aggregate is the fact that once you have applied aggregate ; the result also transforms into Observable, such that to obtain the value of aggregate we have to apply yet another nested .Subscribe . If you do not then Rx will lock. Is this a necessary Evil ? No absolutely not. This is how Rx breaks down parallel processing at granular level which allows the framework to handle very large blocks of memory into steaming frames where conventional Linq to Object will fail.

But just in case you feel the conventional Linq to object code is what makes things looks simpler then here is the code to Subscribe just below the .Retry()

C#
.Retry()
.Subscribe(value =>
{
    var powerPeriods = value.SelectMany(t => t.Periods).GroupBy(g => g.Period).Select(
        s => new PowerPeriod
        {
            Period = s.Key,
            Volume = s.Sum(_ => _.Volume)
        });
    foreach (var powerPeriod in powerPeriods)
    {
        sbpowerpositionLines.AppendLine(string.Format("{0},{1}",
            dateTimeHelper.UtcIndexToLocalHourMap[powerPeriod.Period], powerPeriod.Volume));
        Log.Info(string.Format("Period {0}, Volume {1}",
            dateTimeHelper.UtcIndexToLocalHourMap[powerPeriod.Period],
            powerPeriod.Volume));
    }
}

You will also notice that I’m using a DateTimeHelper to convert an UTC time index to localtime. Further this helper resets itself when the day rolls over and automatically readjusts its internal dictionary at 12:00am. This allows us one hour time before the clock shifts back or forth at Daylight saving start or end.

C#
.Subscribe(value =>
{
    value.Volume.Subscribe(volume =>
    {
        sbpowerpositionLines.AppendLine(string.Format("{0},{1}",dateTimeHelper.UtcIndexToLocalHourMap[value.Period], volume));
        Log.Info(string.Format("Period {0}, Volume {1}",value.Period,volume));
    });
}

Below async delegate is the inner OnComplete. I have introduced a StreamMode enum primarily as a technique to assist in the unit testing. Clearly when I unit test , I do not want any flush into physical files , but rather to test if the inner OnNextcomputation has finally Completed with the expected output. Otherwise this is the logical place where all the concurrent operations on the sequence has converged and there is a guarantee from Rx that no other action is pending on the sequence.

C#
async () =>
{
    if (streamMode == StreamMode.StreamToMemory) return;
    string path = Path.Combine(csvFilePath,"PowerPosition" + dtrunDate.ToString("_yyyyMMdd_") + DateTime.Now.ToString("HHmm") + ".csv");
    if (Directory.Exists(csvFilePath))
    {
        using (var stream = new StreamWriter(path))
        {
            await stream.WriteAsync(sbpowerpositionLines.ToString());
            await stream.FlushAsync();
        }
        Log.Info("Completed " + path + "\n");
    }
    else
    {
        Log.Error("Completed but Path " + path + " do not exist !!\n");
    }
}

As the specification suggests the times are to be converted to local GMT Time Zone. Clearly this implies that on the daytime saving start the clock will shift forward by 1 hour as reflected below the time shows as 2:00am instead of 1:00am.

Similarly on Daylight saving end date the clock will shift back by 1 hour exactly at 2:00am to 1:00am. This is also reflected in the report as below.

Daylight Saving End 2:00am
Period Volume
23:00 20
00:00 20
01:00 20
01:00 20
02:00 20
03:00 20
04:00 20
05:00 20
.. ..
20:00 20
21:00 20
22:00 20
Daylight Saving Start 1:00am
Period Volume
23:00 20
00:00 20
02:00 20
03:00 20
04:00 20
05:00 20
06:00 20
07:00 20
.. ..
19:00 20
20:00 20
21:00 20
Normal BST
Period Volume
23:00 20
00:00 20
01:00 20
02:00 20
03:00 20
04:00 20
05:00 20
06:00 20
.. ..
20:00 20
21:00 20
22:00 20

Unit Testing

Now we will look into aspects of Unit Testing. Although it might look quite daunting to starting with as to how to unit test nested functions and that too with sub levels running concurrently. But once you are used to the Rx Testing paradigm , testing these complex scenarios will appear as easy breeze.

As you can see below the TestScheduler that Rx provides is the most important tool which allows us to advance time into a virtual dimension, thereby allowing us to perform the time based tests.

C#
[SetUp]
Setup()
{
    _testScheduler = new TestScheduler();
    _powerService  = new Mock<IPowerservice>();
}

Aggregation Test

Here we test the basic functionality such as given two trades , we have an expected output. As you can see below that we have faked the GetTradesAsync to return a task from a completed task.

Next we plug the faked powerservice source and advance the timer of the injected scheduler into the virtual time dimension. This will in effect fire up the nested timer based events.

We test that given two trades should result as below

{T1[ {1,20} , {2,30} , {3,40}] , T2[ {1,20} , {2,30} , {3,40}] } => R[{"23:00",40},{"00:00",60},{"01:00",80}]
C#
[Test]
public void Should_Flatten_Trades_And_Aggregate_Periods_Per_Hour_LocalTime()
{
    //Arrange
    var intradayReporter = new IntraDayReporter();
    _powerService.Setup(p => p.GetTradesAsync(It.IsAny()))
    .Returns(
        Task.FromResult(CreateMockPowerTrades(It.IsAny(), 2,new[]{
                        new PowerPeriod { Period = 1, Volume = 20 },
                        new PowerPeriod { Period = 2, Volume = 30 },
                        new PowerPeriod { Period = 3, Volume = 40 }}
                        )));
        DateTime date = DateTime.ParseExact( "2011/03/28 10:42:33", "yyyy/MM/dd HH:mm:ss", CultureInfo.InvariantCulture);
        StringBuilder sb = new StringBuilder();
        TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
        const string expected = "Local Time,Volume\r\n23:00,40\r\n00:00,60\r\n01:00,80\r\n";
    //Act
    intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny(), IntraDayReporter.StreamMode.StreamToMemory);
    _testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
    var actual = sb.ToString();
    //Assert
    Assert.AreEqual(expected,actual);
}

Exception Handling Test

We will further see the amazing ability that Rx Test Framework in conjunction with MoQ framework which allows us to test complex concurrent time based workflow sequence which otherwise would have been near impossible.

What you see below is MOQ setup to return in sequence an exception, then a sequence of trades and subsequently another set of trades.

You will note that once the exception occurs normaly Rx will result in OnError, however we catch the exception and then return the result of an instantaneous call to GetTrades rather than awaiting for the next schedule.

This way we are able to meet the requirement 7 as to not acceptable to miss a scheduled extract. Hence we see that despite exception thrown by GetTrades we are see the expected result matching the subsequent call tp GetTrades.

When we later advance the Time by next schedule we see the next set of results matching thereby proving that earlier exception did not terminate the outer interval sequence.

C#
[Test]
public void Should_RetryOnException_And_Then_Continue_NextSchedule()
{
    //Arrange
    _retryCount = 0;
    var intradayReporter = new IntraDayReporter();
    _powerService.SetupSequence(p => p.GetTradesAsync(It.IsAny<datetime>()))
        .Throws(new PowerServiceException("Thrown from Unit Test"))
        .Returns(
            Task.FromResult(CreateMockPowerTrades(It.IsAny<datetime>(), 2, new[]{
                            new PowerPeriod { Period = 1, Volume = 20 },
                            new PowerPeriod { Period = 2, Volume = 30 },
                            new PowerPeriod { Period = 3, Volume = 40 }}
                            )))
        .Returns(
            Task.FromResult(CreateMockPowerTrades(It.IsAny<datetime>(), 3, new[]{
                            new PowerPeriod { Period = 1, Volume = 10 },
                            new PowerPeriod { Period = 2, Volume = 10 },
                            new PowerPeriod { Period = 3, Volume = 10 }}
                            )));
    DateTime date = DateTime.ParseExact("2011/03/28 10:42:33", "yyyy/MM/dd HH:mm:ss", CultureInfo.InvariantCulture);
    StringBuilder sb = new StringBuilder();
    TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
    const string expectedfirst = "Local Time,Volume\r\n23:00,40\r\n00:00,60\r\n01:00,80\r\n";
    const string expectedsecond = "Local Time,Volume\r\n23:00,30\r\n00:00,30\r\n01:00,30\r\n";
    //Act
    intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<string>(), IntraDayReporter.StreamMode.StreamToMemory);
    _testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
    var actual = sb.ToString();
    //Assert
    Assert.AreEqual(expectedfirst, actual);
    //Act
    /*Advance Virtual time to next schedule */
    _testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
    actual = sb.ToString();
    //Assert
    Assert.AreEqual(expectedsecond, actual);
}

Daylight Saving Start Test

In this test we have chosen a date 29th March 2015, which is the start of Daylight saving. As because the time shifts forward by 1 hour at 1:00am in BST timezone; we will have total of 23 hours inbetween 11:00pm 28thMarch 2015 and 29th March 2015 instead of normal 24 hours. Now that the result is expected in Localtime we will need to show hour "03:00″ for the period id 3. This will indicate that the dateTimeHelper class is resetting the hour dictionary correctly.

It is to be noted that DayLightHelper attempts to resets its internal dictionary whenever the day rolls over incase the day falls in daylightsaving for the given timezone.

C#
[Test]
public void Should_Handle_24HRPreviouDay_to_22HRCurrentDay_LocalTimeOn_DaylightSaving_Start()
{
    //Arrange
    var intradayReporter = new IntraDayReporter();
    _powerService.Setup(p => p.GetTradesAsync(It.IsAny<Datetime>()))
        .Returns(
        Task.FromResult(CreateMockPowerTrades(It.IsAny<Datetime>(), 2, new[]{
                        new PowerPeriod { Period = 1, Volume = 10 },
                        new PowerPeriod { Period = 2, Volume = 10 },
                        new PowerPeriod { Period = 3, Volume = 10 },
                        new PowerPeriod { Period = 4, Volume = 10 },
                        new PowerPeriod { Period = 5, Volume = 10 },
                        new PowerPeriod { Period = 6, Volume = 10 },
                        new PowerPeriod { Period = 7, Volume = 10 },
                        new PowerPeriod { Period = 8, Volume = 10 },
                        new PowerPeriod { Period = 9, Volume = 10 },
                        new PowerPeriod { Period = 10, Volume = 10 },
                        new PowerPeriod { Period = 11, Volume = 10 },
                        new PowerPeriod { Period = 12, Volume = 10 },
                        new PowerPeriod { Period = 13, Volume = 10 },
                        new PowerPeriod { Period = 14, Volume = 10 },
                        new PowerPeriod { Period = 15, Volume = 10 },
                        new PowerPeriod { Period = 16, Volume = 10 },
                        new PowerPeriod { Period = 17, Volume = 10 },
                        new PowerPeriod { Period = 18, Volume = 10 },
                        new PowerPeriod { Period = 19, Volume = 10 },
                        new PowerPeriod { Period = 20, Volume = 10 },
                        new PowerPeriod { Period = 21, Volume = 10 },
                        new PowerPeriod { Period = 22, Volume = 10 }}
                        )));
    DateTime date = DateTime.ParseExact("2015/03/29", "yyyy/MM/dd", CultureInfo.InvariantCulture);
    StringBuilder sb = new StringBuilder();
    TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
    const string expected = "Local Time,Volume\r\n23:00,20\r\n00:00,20\r\n02:00,20\r\n03:00,20\r\n04:00,20\r\n05:00,20\r\n06:00,20\r\n07:00,20\r\n08:00,20\r\n09:00,20\r\n10:00,20\r\n11:00,20\r\n12:00,20\r\n13:00,20\r\n14:00,20\r\n15:00,20\r\n16:00,20\r\n17:00,20\r\n18:00,20\r\n19:00,20\r\n20:00,20\r\n21:00,20\r\n";
    //Act
    intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<string>(), IntraDayReporter.StreamMode.StreamToMemory);
    _testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
    var actual = sb.ToString();
    //Assert
    Assert.AreEqual(expected, actual);           
}

Daylight Saving End Test

Similarly we check the scenario such that the during the DayLight Saving end, exactly at 2:00am , the clock turns back by one hour and become 1:00am. Clearly we will have 25 UTC hours between 23:00am previous day and current day 23:00 hr for this specific day. This can be tested similary like above.

C#
[Test]
public void Should_Handle_24HRPreviouDay_to_25HRCurrentDay_LocalTimeOn_DaylightSaving_End()
{
    //Arrange
    var intradayReporter = new IntraDayReporter();
    _powerService.Setup(p => p.GetTradesAsync(It.IsAny<DateTime>()))
        .Returns(
        Task.FromResult(CreateMockPowerTrades(It.IsAny<DateTime>(), 2, new[]{
                                        new PowerPeriod { Period = 1, Volume = 10 },
                                        new PowerPeriod { Period = 2, Volume = 10 },
                                        new PowerPeriod { Period = 3, Volume = 10 },
                                        new PowerPeriod { Period = 4, Volume = 10 },
                                        new PowerPeriod { Period = 5, Volume = 10 },
                                        new PowerPeriod { Period = 6, Volume = 10 },
                                        new PowerPeriod { Period = 7, Volume = 10 },
                                        new PowerPeriod { Period = 8, Volume = 10 },
                                        new PowerPeriod { Period = 9, Volume = 10 },
                                        new PowerPeriod { Period = 10, Volume = 10 },
                                        new PowerPeriod { Period = 11, Volume = 10 },
                                        new PowerPeriod { Period = 12, Volume = 10 },
                                        new PowerPeriod { Period = 13, Volume = 10 },
                                        new PowerPeriod { Period = 14, Volume = 10 },
                                        new PowerPeriod { Period = 15, Volume = 10 },
                                        new PowerPeriod { Period = 16, Volume = 10 },
                                        new PowerPeriod { Period = 17, Volume = 10 },
                                        new PowerPeriod { Period = 18, Volume = 10 },
                                        new PowerPeriod { Period = 19, Volume = 10 },
                                        new PowerPeriod { Period = 20, Volume = 10 },
                                        new PowerPeriod { Period = 21, Volume = 10 },
                                        new PowerPeriod { Period = 22, Volume = 10 },
                                        new PowerPeriod { Period = 23, Volume = 10 },
                                        new PowerPeriod { Period = 24, Volume = 10 },
                                        new PowerPeriod { Period = 25, Volume = 10 }}
                                        )));
    DateTime date = DateTime.ParseExact("2015/10/25", "yyyy/MM/dd", CultureInfo.InvariantCulture);
    StringBuilder sb = new StringBuilder();
    TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
    const string expected =
        "Local Time,Volume\r\n23:00,20\r\n00:00,20\r\n01:00,20\r\n01:00,20\r\n02:00,20\r\n03:00,20\r\n04:00,20\r\n05:00,20\r\n06:00,20\r\n07:00,20\r\n08:00,20\r\n09:00,20\r\n10:00,20\r\n11:00,20\r\n12:00,20\r\n13:00,20\r\n14:00,20\r\n15:00,20\r\n16:00,20\r\n17:00,20\r\n18:00,20\r\n19:00,20\r\n20:00,20\r\n21:00,20\r\n22:00,20\r\n";
 
    //Act
    intradayReporter.Run(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<String>(), IntraDayReporter.StreamMode.StreamToMemory);
    _testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
    var actual = sb.ToString();
 
    //Assert
    Assert.AreEqual(expected, actual);
}

DateTimeHelper Internal Dictionary Reset Test

The specification requires the results to be in the Local Time. Clearly as we would be getting the hour offset in UTC, we will need a mechanism to convert the UTC timeoffset to a valid Local time. Also this is a functionality which can be separated from the core functionality above in line with the first principle of solid design which is separation of concern. As you will notice that each timer event offsets the time that this process has started on. The offseted time is input to the PowerService so that it can pull the correct Trades specific to the date. So for the sake of our testing there are two times which are of interest to us. These are 1:00 am on the Daylight Saving start and 2:00 am on the Daylight Saving End. At these times the Local time wiill change and hence to provide the correct local time the internal dictionary has to change which is facilitate on the DayLightHelper instance through the .Reset() extension method.

C#
[Test]
public void DateTimeHelperDayRollOver1AM_OnDaylightSavingStart()
{
    //Arrange
    var dateStart = DateTime.ParseExact("2015/03/28 23:30", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
    var dateRollOverPre1am = DateTime.ParseExact("2015/03/29", "yyyy/MM/dd", CultureInfo.InvariantCulture);
    var dateRollOverpsot1am = DateTime.ParseExact("2015/03/29 01:00", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
    var dateTimeHelper = new DateTimeHelper(dateStart, TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
    var expectedReset = new Dictionary<int, string>
    {
        {1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"02:00"},{5, "03:00"},{6,"04:00"},
        {7, "05:00"},{8,"06:00"},{9, "07:00"},{10,"08:00"},{11, "09:00"},{12,"10:00"},
        {13,"11:00"},{14,"12:00"},{15, "13:00"},{16,"14:00"},{17, "15:00"},{18,"16:00"},
        {19,"17:00"},{20,"18:00"},{21, "19:00"},{22,"20:00"},{23, "21:00"},{24,"22:00"}
    };
 
    var expectedReset1 = new Dictionary<int, string>
    {
        {1, "23:00"},{2,"00:00"},{3, "02:00"},{4,"03:00"},{5, "04:00"},{6,"05:00"},
        {7, "06:00"},{8,"07:00"},{9, "08:00"},{10,"09:00"},{11, "10:00"},{12,"11:00"},
        {13,"12:00"},{14,"13:00"},{15, "14:00"},{16,"15:00"},{17, "16:00"},{18,"17:00"},
        {19,"18:00"},{20,"19:00"},{21, "20:00"},{22,"21:00"},{23, "22:00"}
    };
 
    //Act
    var dateTimeHelperReset = dateTimeHelper.Reset(dateRollOverPre1am,
        TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
 
    //Assert
    CollectionAssert.AreEquivalent(expectedReset,dateTimeHelperReset.UtcIndexToLocalHourMap);
 
    var dateTimeHelperReset1 = dateTimeHelperReset.Reset(dateRollOverpsot1am,
        TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
 
    //Assert
    CollectionAssert.AreEquivalent(expectedReset1, dateTimeHelperReset1.UtcIndexToLocalHourMap);
 
}

[Test]
public void DateTimeHelperDayRollOver2AM_OnDaylightSavingEnd()
{
    //Arrange
    var dateStart = DateTime.ParseExact("2015/10/24 23:30", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
    var dateRollOverPre2am = DateTime.ParseExact("2015/10/25", "yyyy/MM/dd", CultureInfo.InvariantCulture);
    var dateRollOverpsot2am = DateTime.ParseExact("2015/10/25 02:00", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
    var dateTimeHelper = new DateTimeHelper(dateStart, TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
    var expectedReset = new Dictionary<int, string>
    {
        {1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"02:00"},{5, "03:00"},{6,"04:00"},
        {7, "05:00"},{8,"06:00"},{9, "07:00"},{10,"08:00"},{11, "09:00"},{12,"10:00"},
        {13,"11:00"},{14,"12:00"},{15, "13:00"},{16,"14:00"},{17, "15:00"},{18,"16:00"},
        {19,"17:00"},{20,"18:00"},{21, "19:00"},{22,"20:00"},{23, "21:00"},{24,"22:00"}
    };

    var expectedReset1 = new Dictionary<int, string>
    {
        {1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"01:00"},{5, "02:00"},{6,"03:00"},
        {7, "04:00"},{8,"05:00"},{9, "06:00"},{10,"07:00"},{11, "08:00"},{12,"09:00"},
        {13,"10:00"},{14,"11:00"},{15, "12:00"},{16,"13:00"},{17, "14:00"},{18,"15:00"},
        {19,"16:00"},{20,"17:00"},{21, "18:00"},{22,"19:00"},{23, "20:00"},{24,"21:00"},
        {25,"22:00"}
    };

    //Act
    var dateTimeHelperReset = dateTimeHelper.Reset(dateRollOverPre2am,
        TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));

    //Assert
    CollectionAssert.AreEquivalent(expectedReset, dateTimeHelperReset.UtcIndexToLocalHourMap);

    var dateTimeHelperReset1 = dateTimeHelperReset.Reset(dateRollOverpsot2am,
        TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));

    //Assert
    CollectionAssert.AreEquivalent(expectedReset1, dateTimeHelperReset1.UtcIndexToLocalHourMap);

}

The Test Results are below

Image 1

Installation

Installation is quite simple for Windows services and can vbe done like below

installutil /LogFile=svcinstalllog.txt AlanAamy.Net.RxTimedWindow.Service.exe

Console based Debugging

Services can be tweaked to run in console mode and that is quite simple to do regular integration testing , once when the unit tests are all done. You can debug the same service in console mode using command parameter -console in debug. I’ll keave this for you to explore in the code. But the results can be see bu configuring the Log4net to flush to the console as you can see the results below.

Image 2

Performance

The memory usage graph shows no steps in the private memory. This clearly indicates no memory leak albeit the snapshot window below is less. The CPU usage in a multi-core machine will show uniform distribution of load across the CPU clearly eliminating CPU affinity issues which in traditional thread programming would not take care of these issue without you writing quite an extra amount of code.

Image 3

Building From Source

  1. Move to your local git repository directory or any directory (with git init) in console.
  2. Clone repository.
    git clone https://github.com/arupalan/RxTimedWindow.git
  3. Move to source directory, update submodule and build.
    cd AlanAamy.Net.RxTimedWindow/
                          git submodule update --init --recursive
                          msbuild
    

License

This article, along with any associated source code and files, is licensed under The GNU Lesser General Public License (LGPLv3)