March 2018

Volume 33 Number 3

[Azure]

Enterprise Data Integration Patterns with Azure Service Bus

By Stefano Tempesta

In the age of Big Data and machine learning, acquiring and managing information is critical, but doing so can be a complex undertaking because data is often more complex than you might realize. When considering how to make your application communicate with other IT systems, an effective design for data exchange is key to success. This article provides an overview and implementation of data integration processes between applications using Azure Service Bus.

Data Integration Design Patterns

Data flows in multiple directions across networks, applications, and volatile or persistent repositories. It may be exchanged record by record or in batches, through systems that talk to each other in real time or via scheduled synchronization jobs. Despite the variety of data integration “journeys,” it’s possible to identify common design patterns for how to address them in an enterprise context, where requirements include high availability, guaranteed delivery and security. Design patterns in software engineering are the most logical and proven sequences of steps to resolve a task. The four most common design patterns for data integration are broadcast, aggregation, bidirectional synchronization and correlation.

In this article, I introduce each of these data integration design patterns and describe their application in the context of Azure Service Bus. An approach to data integration that leverages an enterprise service bus (ESB) facilitates the implementation of these patterns in a very effective way, by simply defining source and target systems, frequency of communication, and the data format for input and output. Along with the description of each pattern, I include code samples to illustrate communication with Azure Service Bus.

Context and Requirements

I’ll start my journey to data integration by defining the enterprise context. In this case, let’s assume I’m building an e-commerce platform, with the typical requirements for an online catalog and shopping cart, and that I’ll publish my application in Azure. The e-commerce application is part of a larger ecosystem of IT systems, some exposed on a public cloud, some still hosted on a private datacenter. Thus, I’m operating in a truly hybrid context. The data integration requirements include the following capabilities:

  1. Transfer product sales information from the e-commerce application to an invoicing system and to social media platforms.
  2. Receive product availability information from an enterprise resource planning (ERP) application, and product descriptions from third-party vendor systems.
  3. Add location tracking to shipped parcels.
  4. Share customer data with partner organizations for co-marketing activities.

All of these integration requirements can be addressed using the design patterns mentioned earlier. Let’s take a closer look.

The Broadcast Pattern

For the first capability, I need to extract data concerning product sales from the e-commerce application and transfer it to multiple target systems—a financial system for issuing invoices and one or more social media platforms for promotion. The data flow direction is one-directional, from the application to external systems. I’m basically broadcasting the information to the world outside.

The broadcast integration pattern describes how to transfer data from an application to multiple target systems in a continuous real-time or near-real-time flow. This process is expected to be transactional: If a transaction completes successfully, data is committed at destination. If the transaction fails, the data transfer is aborted. It’s immediately obvious that this broadcast integration channel must be highly available and reliable, in order to avoid losing critical data in transit. Adopting an ESB as the mechanism for queuing data packets and guaranteeing delivery at destination becomes crucial.

Implementing the broadcast pattern closely resembles implementing the publish/subscribe pattern in Azure Service Bus, based on topics and subscriptions (bit.ly/2oOmTtM). Topics represent queues of messages to which recipient applications (subscribers) subscribe to receive updates when a message is posted. My e-commerce application publishes a message into a topic. The ESB acts as a message broker and guarantees delivery of the message at destination by “pushing” the message to the destination, which consists only of subscribed recipients.

Broadcasting a data packet from the e-commerce application essentially means publishing a message in a topic, and having a target application listening on a specific subscription. The broadcast pattern adds a transactional attribute to the data flow, with the possibility to cancel the transaction in case of delivery failure. As transactions cross system boundaries, they benefit from a “state machine” that retains a snapshot of the brokered message being transferred, before it’s read by all subscribed applications. If any of the subscribers fails to retrieve the message, the entire transaction is aborted, to ensure consistency across all systems involved.

The following code broadcasts a message to an Azure Service Bus topic and implements a state machine (bit.ly/29tKRT3) for tracking delivery of a message:

public class Broadcast
{
  public async Task Execute(Entity entity)
  {
    var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
    var message = new BrokeredMessage(JsonConvert.SerializeObject(entity));
    await client.SendAsync(message);
  }

In case of error on delivery, the state machine moves the message to the “dead letter” queue in Azure Service Bus. The message at that point is no longer valid for data transfer and won’t be processed further.

Sending a message to a topic in Azure Service Bus requires a TopicClient connection and a BrokeredMessage to wrap the original entity and send it, asynchronously, to the bus. All required objects for connecting to Azure Service Bus are distributed in the WindowsAzure.ServiceBus NuGet package and are available in the Microsoft.ServiceBus.Messaging namespace.

The state machine is a singleton asynchronous dictionary containing transaction counters by topic. The dictionary keeps count of the number of active transactions—subscribers—that are waiting for a message on a specific topic from the Service Bus. The dictionary is thread-safe to allow for concurrent requests:

private static StateMachine _instance;
public static StateMachine Current => _instance ?? (_instance = new StateMachine());
protected ConcurrentDictionary<string, int> transactions =
  new ConcurrentDictionary<string, int>();

As shown in Figure 1, a subscriber application reads a message from the Service Bus topic by beginning a new transaction (using the BeginTransactionAsync method) for a specific topic on the state machine, and then handles the OnMessage event to obtain a copy of the entity. The entity is then processed internally; for example, it may be persisted by the recipient system. In case of error, the transaction is canceled.

Figure 1 Reading a Message from the Service Bus Topic

public async Task ReadMessageAsync()
{
  await StateMachine.Current.BeginTransactionAsync(topicName);
  var client = SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName,
    subscriptionName);
  client.OnMessageAsync(async message =>
  {
    var entity = JsonConvert.DeserializeObject(message.GetBody<string>());
    try
    {
      Save(entity);
      await StateMachine.Current.SuccessAsync(message , topicName);
    }
    catch
    {
      await StateMachine.Current.CancelAsync(message , topicName);
    }
  });
}

Completing or aborting the transaction is managed by the state machine using either of two methods—SuccessAsync or Cancel­Async. SuccessAsync invokes CompleteAsync on the brokered message, which indicates that the message should be marked as processed and eventually deleted from the topic. This takes place only when all concurrent active transactions are completed:

public async Task<bool> SuccessAsync(BrokeredMessage message, string topicName)
{
  bool done = await EndTransactionAsync(topicName);
  int count = Current.transactions[topicName];
  // All concurrent transactions are done
  if (done && count == 0)
  {
     await message.CompleteAsync();
  }
  return done;
}

CancelAsync, in contrast, aborts the message broadcast by resetting the transaction counter for a topic. By calling the DeadLetterAsync method, the brokered message is then moved to the “dead letter” queue, where unsuccessfully processed messages are stored:

public async Task<bool> CancelAsync(BrokeredMessage message, string topicName)
{
  // Cancel the message broadcast -> Remove all concurrent transactions
  int count = Current.transactions[topicName];
  bool done = Current.transactions.TryUpdate(topicName, 0, count);
  if (done)
  {
    await message.DeadLetterAsync();
  }
  return done;
}

The Aggregation Pattern

The second requirement for my e-commerce platform is to share information about products from external systems and consolidate it into the Web portal. The direction of the data flow in this case is opposite to that of the broadcast pattern. The requirement now is to aggregate data from various sources into a single place. A simple approach would be to import data directly, point to point, from each source into the e-commerce application. But this isn’t a scalable solution, as it implies building a different connection for each external system that’s sending data to the target repository. Instead, by aggregating data in one process through an ESB, I eliminate the need for multiple one-way integrations, and mitigate concerns about data accuracy and consistency as data is processed in an atomic transaction.

What arises, though, is the challenge of merging data into a single entity without duplicating information or, worse, corrupting it. It’s often necessary to implement custom merge logic as part of the integration process, in order to track aggregated records sourced from different systems and store them in one or more entities in the application. I need a mapping table to associate record IDs in the different source databases with the entity ID in the target database. This mapping table is typically persisted in a high-transaction database, and updated by the data merge custom logic during the aggregation process.

As with the Broadcast pattern, the implementation of this integration pattern also reflects publishing/subscribing to an Azure Service Bus topic, with the difference that, in this case, my e-commerce application is the target system receiving data (subscribing to a topic) from other source systems via the ESB. The overall solution also needs to use some data merge logic and data mapping to track source record IDs and the entity ID at destination.

As you can see in Figure 2, reading messages from a topic consists of creating a subscription connection (SubscriptionClient), and handling the OnMessage event that’s raised by the subscription client when a new message is available in the topic. The received message contains an object sent by an external application, say a vendor sending product details. This object is then mapped to an entity in my system, using the entity map, and if the entity already exists in my database, I update it; otherwise I create a new one.

Figure 2 Reading Messages from a Topic

public class Aggregation
{
  public void Execute()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessage(message => {
      ProductEntity product =
        EntityMap.Instance.MapToEntity<ProductEntity>(message);
      // Persist the product
      var exists = Find(product.Id) != null;
      if (exists)
        Update(product);
      else
        Create(product);
      });
  }

The mapping process, implemented in the EntityMap class, consists of two important steps:

  1. It creates a map between the object in the external system and the entity in my database. This map identifies the external object by pairing the system name (such as “ERP,” “Vendor1,” “Vendor2”) with the primary key. The mapped entity is identified by its type in my application (Product, Customer, Order) and its ID.
  2. It builds an entity record using the properties of the external object. This is the custom merge logic, which can be as easy as using a library like AutoMapper (automapper.org) to map objects.

As shown in Figure 3, the object-entity map is a dictionary that associates a system name-primary key pair to an entity type-­entity id counterpart. An object in an external system is identified uniquely by the combination of system name and primary key, whereas an entity in my application is identified by the combination entity type and entity id.

Figure 3 The Object-Entity Map

System Name Primary Key Entity Type Entity Id
ERP ABC012345 Product FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
Vendor 1 1000 Product FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
ERP ABD987655 Product 2110F684-C277-47E7-B8B9-6F17A579D0CE
Vendor 2 1001 Product 2110F684-C277-47E7-B8B9-6F17A579D0CE

The map is populated by retrieving the system name and primary key from the properties of the brokered message, and then building an entity with AutoMapper, as shown in Figure 4.

Figure 4 Populating the Map

public T MapToEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  string systemName = message.Properties["SystemName"] as string;
  string primaryKey = message.Properties["PrimaryKey"] as string;
  T entity = BuildEntity<T>(message);
  map.Add((systemName, primaryKey), (entity.GetType(), entity.Id));
  return entity;
}
private T BuildEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  var source = JsonConvert.DeserializeObject(message.GetBody<string>());
  T entity = Mapper.Map<T>(source);
  return entity;
}

A brokered message can be enriched with any additional property, which the publisher application should set before sending the message to the ESB:

public async Task SendMessageToServiceBus(object obj)
{
  var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
  var message = new BrokeredMessage(JsonConvert.SerializeObject(obj));
  message.Properties["SystemName"] = "Publisher System Name";
  message.Properties["PrimaryKey"] = "Object Primary Key";
  await client.SendAsync(message);
}

The Bidirectional Synchronization Pattern

Let’s look now at the third requirement: adding location tracking to shipped parcels. In more generic terms, I want to augment the attribute of an entity with additional attributes that are provided by a more specialized external application. For this reason, this pattern is sometimes referred to as the Augmentation pattern.

Systems involved in a bidirectional synchronization process with a third-party line-of-business application can extend their functionality beyond their boundaries. As an example, Dynamics 365 is a customer relationship management platform (and much more) that integrates natively with SharePoint for enterprise document management. Dynamics 365 still remains the master data source for all records, but documents are stored in SharePoint as an extension of entities in the CRM system. Basically, once two systems are in bidirectional sync, they behave as one system while still retaining their own dataset and, obviously, functionality. Data is distributed across the two systems, but is seen as a single entity through this seamless integration.

Most of the time, and as with Dynamics 365 and SharePoint, this direct and real-time synchronization is not implemented with a service bus. A point-to-point connector typically exists that knows how to talk to either system with minimum configuration. But what happens when a native connector between applications doesn’t exist? Besides building a custom connector, which may not be a trivial task at all (just think of the effort necessary to understand authentication and API calls, as well as provide the high availability and guarantee of delivery typical of an ESB), what you can do is implement a relay.

Azure Service Bus Relay (bit.ly/2BNTBih) is an extension of Service Bus that facilitates communication among systems in a hybrid configuration by enabling a secure connection to systems not accessible from the public cloud. Let’s assume that the GIS system I’m connecting to is hosted within a private corporate network. Data transfer initiated by the on-premises service connects to the relay through an outbound port, creating a bidirectional socket for communication tied to a particular rendezvous address. The e-commerce application, hosted in Azure, can then communicate with the GIS service behind the firewall by sending messages to the relay service targeting the rendezvous address. The relay service then “relays” data to the on-premises service through a dedicated bidirectional socket. The e-commerce application doesn’t need (and can’t establish) a direct connection to the on-premises GIS service, and it doesn’t even need to know where the service resides, as the entire communication is only to the relay.

Just to mention another advantage of implementing a solution based on Azure Relay, the relay capabilities differ from network-level integration technologies such as VPNs because they can be scoped to a single application endpoint on a single machine. VPN technology, in contrast, is far more intrusive as it relies on altering the network environment.

The NuGet package Microsoft.Azure.Relay contains the namesake namespace with the relevant objects for managing communications with Azure Service Bus Relay. But let’s start by defining the GIS server first, which consists of:

  • GisObject: An object for storing geo-coordinates (latitude and longitude) and a fully resolved location address.
  • GisProcess: A process that maintains a bidirectional connection to the GIS server, via Azure Relay, and transfers instances of GisObject between the GIS server and the e-commerce application.
  • ServerListener: An extension to the GIS server that acts as a bridge between the GIS server itself and Azure Relay.

The bidirectional connection is maintained in multiple steps:

First, I create a hybrid connection client to Azure Relay, using an access security key obtained from the Azure portal:

var tokenProvider =
  TokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key);
var client = new HybridConnectionClient(
  new Uri($"sb://{relayNamespace}/{connectionName}"), tokenProvider);
var relayConnection = await client.CreateConnectionAsync();

Once the connection is established, I run two asynchronous tasks in sequence: The first task sends an instance of GisObject with latitude and longitude coordinates to the relay; the second task reads this object back from the relay. At the conclusion of both tasks, the hybrid connection is closed:

await new Task(
  () => SendToRelay(relayConnection, gisObject)
  .ContinueWith(async (t) =>
  {
    GisObject resolved = await ReadFromRelay(relayConnection);
    ShowAddress(resolved);
  })
  .ContinueWith(async (t) =>
    await relayConnection.CloseAsync(CancellationToken.None))
  .Start());

Sending an object to Azure Relay is a matter of writing a message to a stream. You can serialize the object in several formats; typically this is done in JSON:

private async Task SendToRelay(HybridConnectionStream relayConnection,
  GisObject gisObject)
{
  // Write the GIS object to the hybrid connection
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  string message = JsonConvert.SerializeObject(gisObject);
  await writer.WriteAsync(message);
}

Similarly, reading an object from Azure Relay involves reading from a stream and de-serializing the obtained string of characters into the original object type:

private async Task<GisObject> ReadFromRelay(HybridConnectionStream relayConnection)
{
  // Read the GIS object from the hybrid connection
  var reader = new StreamReader(relayConnection);
  string message = await reader.ReadToEndAsync();
  GisObject gisObject = JsonConvert.DeserializeObject<GisObject>(message);
  return gisObject;
}

The GIS server also listens to traffic passing through the relay, reads incoming messages containing an instance of the serialized GisObject, and resolves a location address by invoking a specific GIS service (not described in the proposed solution):

private async Task Listen(HybridConnectionListener listener,
  CancellationTokenSource cts)
{
  // Accept the next available, pending connection request
  HybridConnectionStream relayConnection;
  do
  {
    relayConnection = await listener.AcceptConnectionAsync();
    if (relayConnection != null)
    {
      ProcessMessage(relayConnection, cts);
    }
  } while (relayConnection != null);
}

The connection is a fully bidirectional stream. As Figure 5 shows, I add a stream reader and a stream writer to it, which allows me to read the JSON-serialized GIS object and to write it back to the relay after resolving the provided geo-coordinates into a location address.

Figure 5 Reading and Writing the JSON-Serialized GIS Object

private async void ProcessMessage(HybridConnectionStream relayConnection,
  CancellationTokenSource cts)
{
  // Bidirectional streams for reading and writing to the relay
  var reader = new StreamReader(relayConnection);
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  while (!cts.IsCancellationRequested)
  {
    // Read a message in input from the relay
    var message = await reader.ReadToEndAsync();
    // Resolve address by invoking a service on the GIS server
    GisObject gisObject =
      JsonConvert.DeserializeObject<GisObject>(message);
    await new GisServer().ResolveAddressAsync(gisObject);
    // Write the message back to the relay
    message = JsonConvert.SerializeObject(gisObject);
    await writer.WriteLineAsync(message);
  }
  await relayConnection.CloseAsync(cts.Token);
}

The Correlation Pattern

There is one more requirement to meet: I need to share customer data with partner organizations. But I don’t want to disclose infor­mation that partners aren’t authorized to access. I need to implement a way to sync data among systems only if they’re correlated with each other.

The Correlation pattern, focuses on the intersection of two datasets and performs a synchronization of that scoped dataset only if a record exists in both systems. While the relay communication with the GIS server would create a new record if the object couldn’t be found in the system, implementing data integration based on the correlation pattern strictly requires that correlated records exist in both systems for the sync to happen. This applies perfectly to my case where I want to share data with marketing partners, but only if they already have this information in their own system. But there’s a clear challenge here—how can I identify related records that represent the same entity (customer) across systems? This condition defines whether customer records can be synced with external partners.

As shown in Figure 6, the data correlation workflow in the e-commerce application sends a customer record with some marketing information to an Azure Service Bus topic. The customer record is an aggregation of data from multiple entities. It’s not advisable to use the same object (a database entity) as a data transfer object (DTO), as this would create a dependency between the service and the data model in the source application. The brokered message is also decorated with a correlation ID that identifies the specific record in a topic subscription; this correlation ID will be useful later in the partner application for verifying whether a customer record already exists.

Figure 6 The Correlation Class

public class Correlation
{
  private async Task Execute(CustomerEntity customer)
  {
    // Map the Customer entity in the e-commerce application (source)
    // to Customer record in the partner application (destination)
    CustomerRecord customerRecord = PrepareCustomerRecord(customer);
    // Create a connection to an Azure Service Bus Topic
    // Serialize the customer record and send the message to the Topic
    var client = TopicClient.CreateFromConnectionString(
      connectionString, topicName);
    var message = new BrokeredMessage(
      JsonConvert.SerializeObject(customerRecord));
    // Register the customer record with the Correlation Service
    // and obtain a Correlation ID
    message.Properties["CorrelationId}"] =
      new CorrelationService().RegisterCustomer(customerRecord, subscriptionName);
    await client.SendAsync(message);
  }

The correlation service simply exposes methods for matching customer records on a specific subscription, and registering a new customer and returning its correlation ID:

public class CorrelationService
{
  public Guid RegisterCustomer(CustomerRecord record, string subscription)
  {
    return store.ContainsKey((record, subscription)) ?
      GetCustomerCorrelationId(record, subscription) :
      AddCustomer(record, subscription);
  }
  public bool CustomerExists(Guid correlationId)
  {
    return store.ContainsValue(correlationId);
  }

Partner applications subscribe to that topic and retrieve the customer record and the correlation ID, as shown in Figure 7. If the customer record exists in their system, it can be saved eventually.

Figure 7 The Partner Class

class Partner
{
  public void ReceiveCustomerRecord()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessageAsync(async message =>
    {
      CustomerRecord customerRecord =
        JsonConvert.DeserializeObject<CustomerRecord>(message.GetBody<string>());
      Guid correlationId = (Guid)message.Properties["CorrelationId"];
      if (CustomerRecordExists(correlationId))
      {
        await SaveAsync(customerRecord);
      }
    });
  }

The entire solution is available free to download from my GitHub repository at bit.ly/2s0FWow.


Stefano Tempesta is a Microsoft MVP and MCT, and chapter leader of CRMUG Switzerland. A regular speaker at international conferences, including Microsoft Ignite, Tech Summit and Developer Week, Stefano’s interests extend to Office & Dynamics 365, Blockchain and AI-related technologies.

Thanks to the following Microsoft technical expert for reviewing this article: Massimo Bonanni
Massimo Bonanni is senior consultant in the Modern Apps team of Microsoft and has worked for twenty years with Microsoft technologies. He is a founding member of Italian user group DomusDotNet and dotNET{podcast}. He was a Microsoft MVP, and now is an Intel Software Innovator and Intel Black Belt.


Discuss this article in the MSDN Magazine forum