SALES: 1-800-867-1380

Event Hubs Programming Guide

Updated: June 16, 2015

This document describes programming with Azure Event Hubs using the Azure .NET SDK. It assumes a preliminary understanding of Event Hubs. For conceptual an overview of Event Hubs, see Event Hubs Overview.

Sending events to an Event Hub is accomplished either using HTTP POST or via an AMQP 1.0 connection. The choice of which to use when depends on the specific scenario being addressed. AMQP 1.0 connections are metered as brokered connections in Service Bus and are more appropriate in scenarios with frequent higher message volumes and lower latency requirements as they provide a persistent messaging channel.

Event Hubs are created and managed using the NamespaceManager class. When using the .NET managed APIs, the primary constructs for publishing data to Event Hubs are the EventHubClient and EventData classes. The Microsoft.ServiceBus.Messaging.EventHubClient class provides the AMQP communication channel over which events are sent to the Event Hub. The EventData class represents an event and is used to publish messages to an Event Hub. This class includes the body, some metadata, and header information about the event. Other properties are added to the EventData object as it passes through an Event Hub.

The .NET managed classes that support Event Hubs are part of the Microsoft.ServiceBus.dll assembly. The easiest way to reference the Service Bus API and to configure your application with all of the Service Bus dependencies is to download the Service Bus NuGet package. For more information, see Using the NuGet Service Bus Package. Alternatively, you can use the Package Manager Console in Visual Studio. To do so, issue the following command in the Package Manager Console window:

Install-Package WindowsAzure.ServiceBus

You can use the NamespaceManager class to create Event Hubs. For example:

var manager = new Microsoft.ServiceBus.NamespaceManager("");
var description = manager.CreateEventHub("MyEventHub");

In most cases, it is recommended that you use the CreateEventHubIfNotExists methods to avoid generating exceptions if the service restarts. For example:

var description = manager.CreateEventHubIfNotExists("MyEventHub");

All Event Hubs creation operations, including CreateEventHubIfNotExists, require Manage permissions on the namespace in question. If you want to limit the permissions of your publisher or consumer applications, you can avoid these create operation calls in production code when you use credentials with limited permissions.

The EventHubDescription class contains details about an Event Hub, including the authorization rules, the message retention interval, partition IDs, status, and path. You can use this class to update the metadata on an Event Hub.

The primary class for interacting with Event Hubs is the EventHubClient class. This class provides both sender and receiver capabilities. You can instantiate this class using the Create method, as shown in the following example.

var client = EventHubClient.Create(description.Path);

This method uses the Service Bus connection information in the App.config file, in the appSettings section. For an example of the appSettings XML used to store the Service Bus connection information, see the documentation for Microsoft.ServiceBus.Messaging.EventHubClient.Create(System.String).

Another option is to create the client from a connection string. This option works well when using Azure worker roles, because you can store the string in the configuration properties for the worker. For example:

EventHubClient.CreateFromConnectionString("<your connectiong string>");

The connection string will be in the same format as it appears in the App.config file for the previous methods:


Finally, it is also possible to create an EventHubClient object from a MessagingFactory instance, as shown in the following code example.

var factory = MessagingFactory.CreateFromConnectionString("<your connection string>");
var client = factory.CreateEventHubClient("MyEventHub");

It is important to note that additional EventHubClient objects created from a messaging factory instance will reuse the same underlying TCP connection. Therefore, these objects have a client-side limit on throughput. The Create method reuses a single messaging factory. If you need very high throughput from a single sender, then you can create multiple message factories and one EventHubClient object from each messaging factory.

You can send events to an Event Hub by creating an EventData instance and sending it via the Send method. This method takes a single EventData instance parameter and synchronously sends it to an Event Hub.

The EventData class has four overloaded constructors that take a variety of parameters, such as an object and serializer, a byte array, or a stream: Microsoft.ServiceBus.Messaging.EventData. It is also possible to instantiate the EventData class and set the body stream afterwards. When using JSON with the EventData class, you can use Encoding.UTF8.GetBytes() to retrieve the byte array for a JSON-encoded string.

The EventData class has a PartitionKey property that enables the sender to specify a value that is hashed to produce a partition assignment. Using a partition key ensures that all the events with the same key are sent to the same partition in the Event Hub. Common partition keys include user session IDs and unique sender IDs. The PartitionKey property is optional and can be provided when using the Microsoft.ServiceBus.Messaging.EventHubClient.Send(Microsoft.ServiceBus.Messaging.EventData) or Microsoft.ServiceBus.Messaging.EventHubClient.SendAsync(Microsoft.ServiceBus.Messaging.EventData) methods. If you do not provide a value for PartitionKey, as they are sent events are distributed to partitions using a round-robin model.

Sending events in batches can dramatically increase throughput. The SendBatch method takes an IEnumerable parameter of type EventData and sends the entire batch as an atomic operation to the Event Hub.

public void SendBatch(IEnumerable<EventData> eventDataList);

It is important to note that a single batch must not exceed the 256KB limit of an event. Additionally, each message in the batch uses the same publisher identity. It is the responsibility of the sender to ensure that the batch does not exceed the maximum event size. If it does, a client Send error is generated.

You can also send events to an Event Hub asynchronously. Sending asynchronously can increase the rate at which a client is able to send events. Both the Send and SendBatch methods are available in asynchronous versions that return a Task object. While this technique can increase throughput, it can also cause the client to continue to send events even while it is being throttled by the Event Hubs service and can result in the client experiencing failures or lost messages if not properly implemented. In addition, you can use the RetryPolicy property on the client to control client retry options.

Although it is most common to send events to an Event Hub with a partition key, in some cases you might want to send events directly to a given partition. For example:

var partitionedSender = client.CreatePartitionedSender(description.PartitionIds[0]);

CreatePartitionedSender returns an EventHubSender object that you can use to publish events to a specific Event Hub partition.

Event Hubs has two primary models for event consumption: direct receivers and higher-level abstractions, such as EventProcessorHost. Direct receivers are responsible for their own coordination of access to partitions within a consumer group.

The most direct way to read from a partition within a consumer group is to use the EventHubReceiver class. To create an instance of this class, you must use an instance of the EventHubConsumerGroup class. In the following code example, the partition ID must be specified when creating the receiver for the consumer group.

EventHubConsumerGroup group = client.GetDefaultConsumerGroup();
var receiver = group.CreateReceiver(client.GetRuntimeInformation().PartitionIds[0]);

The CreateReceiver method has several overloads that facilitate control over the reader being created. These methods include specifying an offset as either a string or timestamp, and the ability to specify whether to include this specified offset in the returned stream, or start after it. After you create the receiver, you can start receiving events on the returned object. The Receive method has four overloads that control the receive operation parameters, such as batch size and wait time. You can use the asynchronous versions of these methods to increase the throughput of a consumer. For example:

bool receive = true;
string myOffset;
    var message = receiver.Receive();
    myOffset = message.Offset;
    string body = Encoding.UTF8.GetString(message.GetBytes());
    Console.WriteLine(String.Format("Received message offset: {0} \nbody: {1}", myOffset, body));

With respect to a specific partition, the messages are received in the order in which they were sent to the Event Hub. The offset is a string token used to identify a message in a partition.

It is important to note that a single partition within a consumer group cannot have more than five concurrent readers connected at any time. As readers connect or become disconnected, their sessions might stay active for several minutes before the service recognizes that they have disconnected. During this time, reconnecting to a partition may fail. For an example of writing a direct receiver for Event Hubs, see the Service Bus Event Hubs Direct Receivers sample.

The EventProcessorHost class processes data from Event Hubs. You should use this implementation when building event readers on the .NET platform. EventProcessorHost provides a thread-safe, multi-process safe runtime environment for event processor implementations that also provides checkpointing and partition lease management.

To use the EventProcessorHost class, you can implement IEventProcessor. This interface contains three methods:

To start event processing, instantiate EventProcessorHost, providing the appropriate parameters for your Event Hub. Then call RegisterEventProcessorAsync to register your IEventProcessor implementation with the runtime. At this point, the host will attempt to acquire a lease on every partition in the Event Hub using a “greedy” algorithm. These leases will last for a given timeframe and then must be renewed. As new nodes, worker instances in this case, come online, they place lease reservations and over time the load shifts between nodes as each attempts to acquire more leases.

Event Processor Host

Over time, an equilibrium is established. This dynamic capability enables CPU-based autoscaling to be applied to consumers for both scale up and scale down. As Event Hubs do not have a direct concept of message counts, average CPU utilization is often the best mechanism to measure back end or consumer scale. If publishers begin to publish more events than consumers can process, the CPU increase on consumers can be used to cause an auto-scale on worker instance count.

The EventProcessorHost class also implements an Azure storage based checkpointing mechanism. This mechanism stores the offset on a per partition basis, so that each consumer can determine what the last checkpoint from the previous consumer was. As partitions transition between nodes via leases, this is the synchronization mechanism that facilitates load shifting.

In addition to the advanced runtime features of EventProcessorHost, Event Hubs enables publisher revocation in order to block specific publishers from sending event to an Event Hub. These features are particularly useful in situations where a publisher’s token has been compromised, or a software update is causing them to behave inappropriately. In these situations, the publisher’s identity, which is part of their SAS token, can be blocked from publishing events.

For more information about publisher revocation and how to send to Event Hubs as a publisher, see the Service Bus Event Hubs Large Scale Secure Publishing sample.

See Also

Other Resources

Event Hubs Code Samples

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback
© 2015 Microsoft