June 2015

Volume 30 Number 6


Azure Insider - Event Hubs for Analytics and Visualization, Part 2

By Bruno Terkaly | June 2015

When using Microsoft Azure Event Hubs to process massive amounts of data, it’s important to focus on both the delivery and consumption side. This is part two of a three-part series of articles about data visualization for Internet of Things (IoT) scenarios. In the April issue (msdn.microsoft.com/magazine/dn948106), I discussed ingesting data at massive scale using Azure Event Hubs, a technology that’s part of the Service Bus offering and supports information publishers (Raspberry Pi) and information consumers (C# Program) at scale. In this article, I’ll focus on the consumption side, C# code that reads from Event Hubs and persists data in a permanent store.

This article is broken into four pillars. I’ll start with the architecture and illustrate message consumption and storage. Second, I’ll discuss the tasks to complete at the Azure Portal. Third, I’ll describe the two different data stores I’ll use and explain why I chose them. And finally, I’ll conclude with an explanation of the client C# code that reads the messages from Event Hubs, as well as the code that stores the messages in Azure SQL Database and DocumentDB.

Event Hubs Architecture

After reading messages from Azure Event Hubs, you’ll want to persist those messages to both Azure SQL Database and DocumentDB. Azure SQL Database is a relational Database as a Service (DaaS) that’s scalable to thousands of databases. DocumentDB is a fully managed, scalable, NoSQL document database service (read more about DocumentDB at bit.ly/1oWZP7i). This architecture is displayed in Figure 1.

Overview of Azure Event Hubs Architecture
Figure 1 Overview of Azure Event Hubs Architecture

Tasks to Complete

To make the solution come to life, I’ll need to accomplish several tasks at the portal. There are two categories of tasks to perform at the Azure Portal. The first category is provisioning for Event Hubs, Azure SQL Database and DocumentDB. I provisioned the Event Hubs in the April article.

In order to read messages from Event Hubs and then persist those messages into Azure SQL Database and DocumentDB, I’ll need to obtain connection information from the Azure Portal. All the settings will be placed in the App.config in the Visual Studio solution for the C# application. I’ll create an ordinary console application using the Event Hubs SDK.

Finally, to assist with database operations, I’ll write a couple of stored procedures for Azure SQL Database. You’ll find the code for creating the table, as well as the two stored procedures, in Database­Code.txt as part of the Visual Studio solution. You can use SQL Server Management Studio to build out the database table and the stored procedures. Use the connection information from the Azure Portal to attach the Azure SQL Database sitting in the Microsoft datacenter to your local copy of Visual Studio. For more information on how to do this, read the Azure Documentation on it at bit.ly/1K1BIeM.

In April, I illustrated the provisioning process for Service Bus Event Hubs. To walk through some simple tutorials, check out “Get Started with Event Hubs” at bit.ly/1F2gp9H.

Once you’ve provisioned Event Hubs, you’ll receive an endpoint. You’ll need to copy the connection information for this and the point that will be placed into App.config when writing the C# code. You’ll also need to provision an Azure SQL Database at the Azure Portal. For more on this process read the documentation at bit.ly/1Enln5c.

Once you’ve created the database, there are three more easy tasks. The first task is to create the temperatures table. The two remaining tasks have to do with two stored procedures you’ll use to clear out old messages and insert new messages. In the Visual Studio project, there’s a text file called DatabaseCode.txt with the table definition for temperatures, as well as the two stored procedures, CleanTable and InsertTempData.

Now that you’ve taken care of Event Hubs and the Azure SQL Database components, turn your attention to DocumentDB. You can learn more about provisioning this NoSQL data store at bit.ly/1IBnGQ5. You can also watch the video from Ryan CrawCour, one of the senior program managers on the team who has shepherded the product since inception.

Once you’ve provisioned DocumentDB at the portal, you’ll need to define a collection name and a database name. The way to think about DocumentDB is as a series of collections, and a collection as a series of documents. The analogy in the database world is the collection is the table and a document is the record. This is a loosely defined analogy because NoSQL data stores are generally schema-less. The collection name used in our application is called CityTempCollection and the database is called TemperatureDB.

Understand Data Storage Options

The starting point for this discussion is to establish the need for secondary data stores beyond what Event Hubs can natively provide. For example, one obvious reason to think about persistence is messages stored in Event Hubs aren’t permanent. Second, you can’t query messages with any sort of query language. You can only read them serially from Event Hub partitions, where each client reader maintains its own cursor in the message store. Third, Event Hubs gives you 84GB of event storage for the default 24-hour retention period.

Although there are many options, I’ve chosen Azure SQL Database and DocumentDB as the primary data stores for temperature-related messages/data. The advantages to Azure SQL Database are two-fold. The first relates to the fact that it’s cloud-hosted, which provides several other valuable attributes. It’s almost instantaneous to provision, it’s economical and it’s triple-replicated, making it fairly robust. 

Aside from those obvious benefits, there’s also a lot of business intelligence tooling built around Azure SQL Database, such as Power BI. This lets you build interactive reports with sophisticated visualization. You can build powerful dashboards that you can then consume from a browser and a mobile device.

Use DocumentDB as the second of two data stores. This has some clear advantages. It’s a fully managed service, so you don’t have to bother with infrastructure and individual virtual machines (VMs). Also, it comes with an enterprise service-level agreement, which is the case with most Azure-based services. Finally, DocumentDB is also schema-free.

Schema-free document stores are often considered desirable in the context of object orientation and inheritance. That’s because inheritance means you have objects with some attributes in common, but also some attributes specific to an object subtype.

With most relational databases, you would need a table for all possible attributes, leaving many of them null for fields that don’t apply. In a schema-less database, however, you can store different sets of optional properties. This works well when rendering HTML because JavaScript can check for an unknown optional property and call the appropriate function to output to a table for display.

One of the other advantages (which can also be seen as a disadvantage for some) for a schema-less database is it provides additional agility during development. You can add new features without restructuring the database. This makes it easy to maintain backward compatibility to data created by a previous version. The downside of this approach is writing queries against optional fields can become complex and convoluted.

One of the areas where a JSON-based data store like DocumentDB really shines is when using Node.js applications to expose the data to mobile or Web applications. The native JSON format, which is compact and expressive, makes it easy and natural with which to work. Consuming JSON-based data stores with HTTP and REST is also straightforward whether you’re using the Microsoft .NET Framework, JavaScript, Ruby, Java, PHP or Python. I’ll focus on Node.js applications reading DocumentDB data and exposing the data to mobile applications in the next installment.

Consume and Persist Messages

There are three common approaches to consuming messages from Event Hubs. As always, there’s a trade-off between what’s easy and what’s flexible. The easy way is with Stream Analytics. Another way is with direct receivers. Direct receivers are responsible for their own coordination of access to partitions within a consumer group, whereby your code directly addresses a partition ID when creating the receiver for the EventHubConsumerGroup object. Another way is with higher-level abstractions, such as Event­ProcessorHost. I’ll use this method because it’s simple enough, yet provides enough flexibility.

Stream Analytics is a fully managed service that makes it easy to ingest messages from Event Hubs. It’s easy to consume, transform and place those messages into an Azure SQL Database. Read more about this approach at bit.ly/1IRvPDc.

It’s a simple matter of creating a database, table and a series of queries. For example, you use a simple select statement to read messages from Event Hubs, “SELECT DeviceId, Temperature FROM input,” and place those messages into SQL Server. You can even chain queries recursively and create a series of queries that transform the data using a pipeline approach. This ability to filter messages through a series of SQL queries is a powerful abstraction.

One of the more interesting extensions available through Stream Analytics has to do with windowing. There’s often the requirement to perform some set-based computation (aggregation) or other operations over subsets of events that fall within some period of time. Because time is crucial in complex event-processing systems, it’s important to have a simple way to work with the time component of query logic in the system. In Azure Stream Analytics, these subsets of events are defined through windows to represent groupings by time. For a detailed explanation of the various types of supported time windows, check out bit.ly/1DIizfM.

Direct receivers let you target specific Event Hub partition IDs. Partitions provide value in two ways. First, they let you scale publishing and consuming messages. Second, they also let you segregate data into separate silos.

I’ll take the EventProcessorHost approach, which is an intelligent agent for .NET consumers that manages partition access and per partition offset for consumers. For a more detailed explanation, please read the ServiceBus Blog post at bit.ly/1aO5I19. Use EventProcessorHost to make it easy to read messages, transform them and write them to permanent storage.

I’ll write a custom C# program that leverages the Event Hubs SDK and write to both Azure SQL Database and DocumentDB (check out the Event Hubs Programming Guide at bit.ly/1IBrpNz). You can get all the source code for this at bit.ly/1aSFF99. I’ve stripped the password and secret keys, but you can get the connection information from the portal. To get all the code, install Git and run the issuing command: git clone. Some of the files you’ll find in the Visual Studio console application are shown in Figure 2. The main driver code can be found in Program.cs.

Figure 2 Code to Consume and Persist Messages

+==============+
|  Section 1   |
+==============+
internal class Program
{
  private static string eventHubConnectionString =             
    ConfigurationManager.AppSettings["eventHubConnectionString"];
  static private string eventHubName = 
    ConfigurationManager.AppSettings["eventHubName"];
  static private string storageAccountName =
    ConfigurationManager.AppSettings["storageAccountName"];
  static private string storageAccountKey =
    ConfigurationManager.AppSettings["storageAccountKey"];
  static private string storageConnectionString =
    string.Format("DefaultEndpointsProtocol=https;" +
    "AccountName={0};AccountKey={1}",
    storageAccountName, storageAccountKey);
  static private string eventProcessorHostName = Guid.NewGuid().ToString();
  private static void Main(string[] args)
  {
    // Start the main message consumption engine
    +==============+
    |  Section 2   |
    +==============+
    var eventProcessorHost =
      new EventProcessorHost(eventProcessorHostName,
      eventHubName, EventHubConsumerGroup.DefaultGroupName,
      eventHubConnectionString, storageConnectionString);
    // Asynchronously consume message from Event Hub
    eventProcessorHost.
      RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
    Console.WriteLine("Receiving. Press enter key to stop worker.");
    Console.ReadLine();
  }
}
+==============+
|  Section 3   |
+==============+
// SimpleEventProcessor.cs
class SimpleEventProcessor : IEventProcessor
{
  private DataManager dataManager = new DataManager(new SQLDatabaseManager());
  // ... Means omitted for brevity
  public SimpleEventProcessor()  ...
  async Task IEventProcessor.CloseAsync(PartitionContext context,
    CloseReason reason) ...
  Task IEventProcessor.OpenAsync(PartitionContext context) ...
  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
    IEnumerable<EventData> messages)
  {
    // Loop through messages to insert
    foreach (EventData eventData in messages)
    {
      string data = Encoding.UTF8.GetString(eventData.GetBytes());
      // Comma separated so divide up fields
      string[] msg = data.Split(',');
      if (msg.Length > 2)
      {
        +==============+
        |  Section 4   |
        +==============+
        // Insert into SQL
        dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
        // Insert into global DocumentDB object
        // (global because of thread timing issues)
        Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
      }
      Console.WriteLine(string.Format("Message received.  Partition: '{0}', " +
        "Data: '{1}'", context.Lease.PartitionId, data));
    }
    // Call checkpoint every 5 minutes, so that worker can resume
    // processing from the 5 minutes back if it restarts
    if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
    {
      await context.CheckpointAsync();
      lock (this)
      {
        this.checkpointStopWatch.Reset();
      }
    }
  }
}
+==============+
|  Section 5   |
+==============+
// DocumentDBManager.cs
public class DocumentDbManager
{
  // Omitted variables for brevity
  public string collectionName = "CityTempCollection";
  public string databaseName = "TemperatureDB";
  public async Task<bool> InsertEntry(
    string city, int month, double temperature)
  {
    dynamic document = null;
    try
    {
      +==============+
      |  Section 6   |
      +==============+
      // Check if City exists
      document = client.CreateDocumentQuery(documentCollection.DocumentsLink)
        .Where(d => d.Id == city).AsEnumerable().FirstOrDefault();
    }
    catch (Exception ex)
    {
      throw;
    }
    bool docExists = (document != null);
    // Document DOESN'T exist, yet
    if (!docExists)
    {
      +==============+
      |  Section 7   |
      +==============+
      var cityMonthTemperature = new CityMonthTemperature
      {
        Id = city,
        City = city
      };
      cityMonthTemperature.Temperatures[month - 1] = temperature;
      try
      {
        +==============+
        |  Section 8   |
        +==============+
        // Create, and set document to the return, yes --
        // here is where you reset the document object
        document = await client.CreateDocumentAsync(
          documentCollection.DocumentsLink, cityMonthTemperature);
      }
      catch (DocumentClientException ex)
      // Omitted for brevity   
    }
  }
}

Section 1 should come as no surprise. It retrieves some of the connection information from App.config. You can get all the connection information can be obtained from the portal. Section 2 is where you’ll actually instantiate EventProcessorHost, the core object in the Event Hubs SDK that lets you retrieve messages. You’ll find the underlying events tied to this object in Section 3, within the class SimpleEventProcessor.

The asynchronous callback, ProcessEventsAsync, is called by the SDK. This passes the parameter IEnumerable<EventData> messages, which you then parse to retrieve the messages stored in Event Hubs. Then parse the messages parameter, and insert the parsed message into SQL Database and DocumentDB in Section 4. You’ll find all the low-level code details for InsertSqlMessage and InsertEntry in the Visual Studio solution.

Section 5 represents the class that actually does the insert oper­ation for DocumentDB. The database name is TemperatureDB and collection name is CityTempCollection. Section 6 represents a query wherein you search for a city using a LINQ query. The logic here is a city may have been previously inserted. What you really want to do is update temperature data if the city exists. 

Section 7 represents the scenario where the city hasn’t been added. You create a simple .NET object that’s transformed into JSON data once the insert takes place. The underlying SDK takes care of this transformation. You insert the temperature into the appropriate month offset of the temperatures array. Finally, in Section 8, you actually update the document object.

The code to perform just an update of the temperature in the scenario where the city has already been inserted was omitted from this code snippet for brevity purposes, but you can find it in the GitHub repository, along with the entire Visual Studio solution, at bit.ly/1aSFF99.

Because DocumentDB is part of a preview offering, you need to use the new Azure Preview Portal at portal.azure.com. One of the nice features of DocumentDB is that there’s a Document Explorer that lets you create documents and data, as well as query data and view existing data, as shown in Figure 3.

Use DocumentDB to Create, Query and View Data
Figure 3 Use DocumentDB to Create, Query and View Data

In the April installment, I created a C program running in Linux to leverage the AMQP transport protocol to insert messages into Event Hubs. That code ran in an Azure-hosted Linux VM so I could easily port it into a Debian-based Raspberry Pi implementation. In short, the first article was about producing message publishing. This installment has been all about message consumption and persistent storage of messages. And in the final installment, I’ll address the ability to expose the persistent data to mobile clients and provide visualization of the underlying data.


Bruno Terkaly is a principal software engineer at Microsoft with the objective of enabling development of industry-leading applications and services across devices. He’s responsible for driving the top cloud and mobile opportunities across the United States and beyond from a technology-enablement perspective. He helps partners bring their applications to market by providing architectural guidance and deep technical engagement during the ISV’s evaluation, development and deployment. Terkaly also works closely with the cloud and mobile engineering groups, providing feedback and influencing the roadmap.

Thanks to the following Microsoft technical experts for reviewing this article: Ryan CrawCour, Dan Rosanova
Ryan CrawCour is 20-year database veteran who started out many years ago writing his first stored procedure for SQL Server 4.2. Many cursors, joins and stored procedures later he began exploring the exciting free world of NoSQL solutions. Ryan is now working with the DocumentDB product team in Redmond as a Program Manager helping shape the future of this all-new NoSQL Database-as-a-Service.

Dan Rosanova is a Senior Program Manager on the Azure Service Bus team responsible for Event Hubs, Messaging (Queues & Topics), and Relay. Dan has been working in the messaging and distributed computing space for sixteen years with a focus on hyper-scale and evolutionary computation.