Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Hosted-services / Azure

Azure: Event Hub - A First Look

4.75/5 (3 votes)
4 Jun 2015CPOL12 min read 37.6K  
Azure: Event Hub - a first look

Over the next few weeks, I am going to be looking at a couple of things I have had on my back log for a while (I need to get these things done, so I can make my pushy work colleague happy by learning Erlang). One of the things that I have on my back log is having a look at Azure Event Hubs.

Event Hubs come under the Azure Service Bus umbrella, but are quite different. They are a high throughput pub/sub at a massive scale, with low latency and high reliability. To be honest, this post will not add much more than you could find on MSDN, in fact, even the demo associated with this post is one directly from MSDN, however in the next series of post(s), I will be showing you some more novel uses of working with Event Hub(s), which will be my own material.

I guess if you have not heard of Azure Event Hubs, there will still be some goodness in here, even if I have poached a lot of the content for this post (please forgive me) from MSDN.

Event Hubs provides a message stream handling capability and though an Event Hub is an entity similar to queues and topics, it has very different characteristics than traditional enterprise messaging. Enterprise messaging scenarios commonly require a number of sophisticated capabilities such as sequencing, dead-lettering, transaction support, and strong delivery assurances, while the dominant concern for event ingestion is high throughput and processing flexibility for event streams. Therefore, the Azure Event Hubs capability differs from Service Bus topics in that it is strongly biased towards high throughput and event processing scenarios. As such, Event Hubs does not implement some of the messaging capabilities that are available for topics. If you need those capabilities, topics remain the optimal choice.

An Event Hub is created at the namespace level in Service Bus, similar to queues and topics. Event Hubs uses AMQP and HTTP as its primary API interfaces.

https://msdn.microsoft.com/library/azure/dn836025.aspx

Partitions

In order to create such a high throughput ingestor (Event Hub), Microsoft used the idea of partitions. I like to use these set of images to help me understand what partitions bring to the table.

Regular messaging may be something like this:

image

Whilst an Event Hub may be more like this (many lanes):

Image 2

What I am trying to show there is that by only having one lane, less traffic may travel, but by having more lanes more traffic will flow.

Event Hubs get their throughput by holding n-many partitions. Using the Azure portal, the maximum number of partitions you may allocate is 16, this may be extended if you contact the Microsoft Azure Service Bus team. Each partition can be thought of as a queue (FIFO) of messages. Messages are held for a configurable amount of time. This setting is global across the entire Event Hub, and as such will effect messages held across ALL partitions.

In order to use partitions from your code you should assign a partition key, which would ensure that the correct partition gets used. If your publishing code does not supply a partition key, a round robin assignment will be used. Ensuring that each partition is fairly balanced in terms of through put.

Stream Offsets

Within each partition, an offset is held within the partition. This offset can be thought of as a client side cursor, giving the position in the message stream that has been dealt with. This offset should be maintained by the event consumer, and may be used to indicate the position in the stream to start processing from should communications to the Event Hub be lost.

Checkpoints

Checkpoints are the responsibility of the consumer, and mark or commit their position within a partition event stream. The consumer can inform the Event Hub when it considers an event stream complete. If a consumer disconnects from a partition, when connection is re-established, it begins reading at the checkpoint that was previously submitted. Due to the fact that event data is held for a specified period, it is possible to return older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and controlled event stream replay.

So How About A Demo

I simply followed the getting started example, which you can find here.

The Publisher

Here is the entire code for a FULLY working Event Hub publisher:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
using System.Threading;
using Microsoft.ServiceBus.Messaging;
 
namespace Sender
{
    class Program
    { 
        static string eventHubName = "{Your hub name}";
        static string connectionString = "{Your hub connection string}";    
 
        static void Main(string[] args)
        {
            Console.WriteLine("Press Ctrl-C to stop the sender process");
            Console.WriteLine("Press Enter to start now");
            Console.ReadLine();
            SendingRandomMessages();
        }
  
        static void SendingRandomMessages()
        {
            var eventHubClient = 
                EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
            while (true)
            {
                try
                {
                    var message = Guid.NewGuid().ToString();
                    Console.WriteLine("{0} > Sending message: {1}", 
                        DateTime.Now, message);
 
                    EventData eventData = new EventData(
                        Encoding.UTF8.GetBytes(message));
 
                    //This is how you can include metadata
                    //eventData.Properties["someProp"] = "MyEvent"
 
                    //this is how you would set the partition key
                    //eventData.PartitionKey = 1.ToString();
                    eventHubClient.Send(eventData);
                }
                catch (Exception exception)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("{0} > Exception: {1}", 
                        DateTime.Now, exception.Message);
                    Console.ResetColor();
                }
 
                Thread.Sleep(5000);
            }
        }
    }
}

It can be seen above that there is a EventHubClient class that you may use to send events. The code above also shows how you create a new event using the EventData class. Although I have not used these features, the code above also shows how to associate metadata with the event, and also set a partition key for the message.

The Consumer

The consumer is a little trickier but not too much, there are only 2 classes of interest in the demo app. The main entry point contains an EventProcessorHost, which used this code.

In an effort to alleviate this overhead, the Service Bus team has created EventProcessorHost an intelligent agent for .NET consumers that manages partition access and per partition offset for consumers.

To use this class, you first must implement the IEventProcessor interface which has three methods:

  • OpenAsync
  • CloseAsync
  • ProcessEventsAsnyc

A simple implementation is shown below:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
using Microsoft.ServiceBus.Messaging;
using Microsoft.Threading;
using System.Threading.Tasks;
 
using Microsoft.Threading;
 
namespace Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncPump.Run(MainAsync);
        } 
 
        static async Task MainAsync()
        {
            string eventHubConnectionString = "{Your hub connection string}";
            string eventHubName = "{Your hub name}";
            string storageAccountName = "{Your storage account name}";
            string storageAccountKey = "{Your storage account key}";
            string storageConnectionString = 
                string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
                storageAccountName, storageAccountKey);
 
            string eventProcessorHostName = Guid.NewGuid().ToString();
            EventProcessorHost eventProcessorHost = 
                new EventProcessorHost(
                    eventProcessorHostName, 
                    eventHubName, 
                    EventHubConsumerGroup.DefaultGroupName, 
                    eventHubConnectionString, storageConnectionString);
            var epo = new EventProcessorOptions()
            {
                MaxBatchSize = 100,
                PrefetchCount = 1,
                ReceiveTimeOut = TimeSpan.FromSeconds(20)
            };
            await eventProcessorHost.RegisterEventProcessorAsync<simpleeventprocessor>(epo); 
 
            Console.WriteLine("Receiving. Press enter key to stop worker.");
            Console.ReadLine();
        }
    }
}

To use this class, you first must implement the IEventProcessor interface which has three methods: OpenAsync, After implementing this class, instantiate EventProcessorHost providing the necessary parameters to the constructor.

  • Hostname – be sure not to hard code this, each instance of EventProcessorHost must have a unique value for this within a consumer group.Eve
  • EventHubPath – this is an easy one.
  • ConsumerGroupName – also an easy one, “$Default” is the name of the default consumer group, but it generally is a good idea to create a consumer group for your specific aspect of
  • processing.EventHubConnectionString – This is the connection string to the particular event hub, which can be retrieved from the Azure portal. This connection string should have Listen permissions on the Event Hub.
  • StorageConnectionString – This is the storage account that will be used for partition distribution and leases. When Checkpointing the lastest offset values will also be stored here.

Finally, call RegisterEventProcessorAsync on the EventProcessorHost and register your implementation of IEventProcessor. At this point, the agent will begin obtaining leases for partitions and creating receivers to read from them. For each partition that a lease is acquired for an instance of your IEventProcessor class will be created and then used for processing events from that specific partition.

Lease Management

Checkpointing is not the only use of the storage connection string performed by EventProcessorHost. Partition ownership (that is reader ownership) is also performed for you. This way only a single reader can read from any given partition at a time within a consumer group. This is accomplished using Azure Storage Blob Leases and implemented using Epoch. This greatly simplifies the auto-scale nature of EventProcessorHost. As an instance of EventProcessorHost starts it will acquire as many leases as possible and begin reading events. As the leases draw near expiration EventProcessorHost will attempt to renew them by placing a reservation. If the lease is available for renewal the processor continues reading, but if it is not the reader is closed and CloseAsync is called – this is a good time to perform any final cleanup for that partition.

EventProcessorHost has a member PartitionManagerOptions. This member allows for control over lease management. Set these options before registering your IEventProcessor implementation.

Controlling the Runtime

Additionally, the call to RegisterEventProcessorAsync allows for a parameter EventProcessorOptions. This is where you can control the behavior of the EventProcessorHost itself. There are four properties and one event that you should be aware of.

  • MaxBatchSize – This is the maximum size of the collection the user wants to receive in an invocation of ProcessEventsAsync. Note that this is not the minimum, only the maximum. If there are not this many messages to be received the ProcessEventsAsync will execute with as many as were available.
  • PrefetchCount – This is a value used by the underlying AMQP channel to determine the upper limit of how many messages the client should receive. This value should be greater than or equal to MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout – Setting this parameter to true will result in ProcessEventsAsync being called when the underlying call the receive events on a partition times out. This is useful for taking time based actions during periods of inactivity on the partition.
  • InitialOffsetProvider – This allows a function pointer or lambda expression to be set that will be called to provide the initial offset when a reader begins reading a partition. Without setting this, the reader will start at the oldest event unless a JSON file with an offset has already been saved in the storage account supplied to the EventProcessorHost constructor. This is useful when you want to change the behavior of reader start up. When this method is invoked, the object parameter will contain the partition id that the reader is being started for.
  • ExceptionReceived – This event allows you to receive notification of any underlying exceptions that occur in the EventProcessorHost. If things aren’t working as you expect, this is a great place to start looking.

Here is the demo code implementation:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
using Microsoft.ServiceBus.Messaging;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace Receiver
{
    class SimpleEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;
 
        async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", 
                context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }
 
        Task IEventProcessor.OpenAsync(PartitionContext context)
        {
            Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", 
                context.Lease.PartitionId, context.Lease.Offset);
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }
 
        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
             
            IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());
 
                Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                    context.Lease.PartitionId, data));
            }
 
            //Call checkpoint every 5 minutes, so that worker can resume processing 
            //from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }
}

This code probably needs a little explanation, and one of the best explanations you are likely to find is over on the Service Bus teams we site, which again I will blatantly steal here:

Thread Safety & Processor Instances

It’s important to know that by default EventProcessorHost is thread safe and will behave in a synchronous manner as far as your instance of IEventProcessor is concerned. When events arrive for a particular partition, ProcessEventsAsync will be called on the IEventProcessor instance for that partition and will block further calls to ProcessEventsAsync for the particular partition. Subsequent messages and calls to ProcessEventsAsync will queue up behind the scenes as the message pump continues to run in the background on other threads. This thread safety removes the need for thread safe collections and dramatically increases performance.

Receiving Messages

Each call to ProcessEventsAsync will deliver a collection of events. It is your responsibility to do whatever it is you intend to do with these events. Keep in mind that you want to keep whatever it is you’re doing relatively fast – i.e., don’t try to do many processes from here – that’s what consumer groups are for. If you need to write to storage and do some routing it is generally better to use two consumer groups and have two IEventProcessor implementations that run separately.

At some point during your processing, you’re going to want to keep track of what you have read and completed. This will be critical if you have to restart reading – so you don’t start back at the beginning of the stream. EventProcessorHost greatly simplifies this with the concept of Checkpoints. A Checkpoint is a location, or offset, for a given partition, within a given consumer group, where you are satisfied that you have processed the messages up to that point. It is where you are currently “done”. Marking a checkpoint in EventProcessorHost is accomplished by calling the CheckpointAsync method on the PartitionContext object. This is generally done within the ProcessEventsAsync method but can be done in CloseAsync as well.

CheckpointAsync has two overloads: the first, with no parameters, checkpoints to the highest event offset within the collection returned by ProcessEventsAsync. This is a “high water mark” in that it is optimistically assuming you have processed all recent events when you call it. If you use this method in this way, be aware that you are expected to perform this after your other event processing code has returned. The second overload allows you to specify an EventData instance to checkpoint to. This allows you to use a different type of watermark to checkpoint to. With this, you could implement a “low water mark” – the lowest sequenced event you are certain has been processed. This overload is provided to enable flexibility in offset management.

When the checkpoint is performed, a JSON file with partition specific information, the offset in particular, is written to the storage account supplied in the constructor to EventProcessorHost. This file will be continually updated. It is critical to consider checkpointing in context – it would be unwise to checkpoint every message. The storage account used for checkpointing probably wouldn’t handle this load, but more importantly checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus Queue may be a better option than an Event Hub. The idea behind Event Hubs is that you will get at least once delivery at great scale. By making your downstream systems idempotent it is easy to recover from failures or restarts that result in the same events being received multiple times.

Shutting Down Gracefully

Finally, EventProcessorHost.UnregisterEventProcessorAsync allows for the clean shut down of all partition readers and should always be called when shutting down an instance of EventProcessorHost. Failure to do this can cause delays when starting other instances of EventProcessorHost due to lease expiration and Epoch conflicts.

When you run this demo code, you will see that 16 partitions are initialized and then messages are dispatches to the partitions.

You can grab a starter for this demo from here though you WILL need to create an Event Hub in Azure as well as a Storage account. Like I say full instructions are available on MSDN for this one, I simply followed the getting started example, which you can find here.

Image 3

This post adds absolutely ZERO to the example shown in the link above, and I have borrowed A LOT of material from MSDN, that said if you have not heard of the Azure Event Hub, you may have learnt something here. In my next post however (which may become an article, where I like to show original work), I will be looking to use an Azure Event Hub along with the Azure Stream Analytics service, which I think should be quite cool, and original. I am however sorry this post is so borrowed……case I could not have said it better myself.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)