August 2017

Volume 32 Number 8

[Azure]

Batch Processing Using a Serverless Architecture

By Joseph Fultz | August 2017

For Azure enterprise customers, a key challenge in managing their cloud footprint is the ability to control what they spend and to charge those costs back to the consumers. Fortunately, there are several vendors that provide tools, such as Cloud Cruiser, Cloudyn, and Cloudability, to help with collecting usage data and generating a rich set of reports. Additionally, you can find many good examples of how to pull data programmatically, such as the post from a former co-worker of mine, Ed Mondek, in which he shows how to pull data into Excel and view it (bit.ly/2rzDOPI). However, if you want to pull that data regularly and enable historical, present trend and predictive views, you need to store a lot more data. For a large enterprise with thousands of resources per subscription, that amount of data can be daunting and is certainly not what you’d want to fetch and keep on a local machine.

Luckily, there's another way. In this article I'm going to walk you through the serverless Extract-Transform-Load (ETL) process I set up to extract such data, provide a little enrichment and store the data to a place where further work (analytics, map-reduce and so forth) can be done. I'll touch on the overall design, key decision points and important things to consider in taking a serverless approach.

Determining Consumption

The first decision is choosing between the Enterprise Agreement (EA) Billing API and the Azure Billing API, which centers its requests around specific subscriptions. My prototype is targeted at enterprise customers with multiple enrollments in an EA. In the scenario with which I’m working, subscriptions are being used as part of the management boundaries for both specific product groups and for separating production from non-production resources. This could result in a fairly high number of subscriptions in flux due to the volatile proof-of-concept (PoC) type of work being created as new groups and new product lines start up in Azure. Thus, I chose to work with the EA API because it reduced the scope of work in that I don’t have to create a discovery mechanism for subscriptions. This leaves me with the noted challenge of not having any data for subscriptions created outside of the enrollments for the enterprise. While this is an important area to tackle, it comes with a number of other process and management challenges that have to be solved organizationally and is outside the scope of the work I want to accomplish.

Requirements and Logical Flow

In any architecture, it’s the intersections between systems that require the most scrutiny in design and testing. A serverless architecture doesn’t change the need to consider the volume of data that moves through the supersystem, and must take into account the particular constraints of the discrete subsystems. The principal change in architecting such a supersystem is more in the depth or scope when defining the system, such as sizing a queue for throughput, but not sizing the hardware that hosts it. You must still consider latency, connectivity, volume, availability, cost, and any number of other factors, but the work of sizing and defining the particulars of the service ends once you’ve defined the capacity and the cost of the capability needed to meet the identified requirements. There’s no additional work of defining the host environment and all its needed artifacts as you might have done in the past.

Before I get into designing what the overall flow of information into the system will look like, let’s note a few facts about the source systems and some requirements for the end-state system:

  • All of the data for every subscription under the EA will be returned for all resources for every day it's available in the designated month. This can result in a lot of data, with a linear growth as the month progresses.
  • Any and all records may be updated throughout the month. The stated settlement timing is 72 hours. As a point of safety, I'll consider all records in flux for a given month until 72 hours past the beginning of the subsequent month.
  • The usage data isn't returned with an ID for the enrollment, so I'll have to add it.
  • Determining cost is a separate activity and requires retrieving the rate card and further processing.
  • No information will be received for subscriptions that aren't in the specified EA.

Additionally, there are a few technical business requirements that the prototype must include:

  • The ability to create read-only and geographically distributed datasets must be included.
  • Processing performance should be adjustable for cost versus performance.
  • The ability to secure access at the subscription level should be designed in.

The overall flow itself is fairly simple in that I'm simply going to retrieve the data, add a small amount of information and persist it into the target storage.

As depicted in Figure 1, the path for getting the data to its target is fairly simple because there's no integration with any external systems other than the EA Billing API. I know that when I work through the data, I'll have to do some amount of initial processing and enrichment (for example, add the enrollment ID), and on the persistence side I'll have to deal with existing records from the previous day's fetches. I'll probably want to look at separating those two processes.

Logical Flow
Figure 1 Logical Flow

Thus, you see three major blocks that represent retrieval, enrichment and persistence, which are all separated by some queuing mechanism. The complications start after I make some technology picks and start looking at the details of implementing with those components and making the processing pipeline run in parallel.

Technology Mapping

At this point in the process, two factors beyond the requirements of the overall system may come into play: enterprise standards and personal preference. If these are in conflict, the result can be almost endless debate. Fortunately, in this instance I don’t have to worry about this. I do have my own mix of constraints, along with those I noted from the initial requirements. In this case, I’d like to make sure to hit these marks:

  • Simplest compute provisioning and edits/updates for quick cycles of testing
  • Easy automatic scaling
  • Easy provisioning for geographic distribution of data
  • Easy mechanisms for scheduling and triggering work

Here, I want to focus on the work and not on the system setup. I'll leave things like cost analysis for various implementations and adherence to corporate standards until after I have a working prototype. I did consider some alternatives, such as Azure SQL Database versus Azure Cosmos DB, but I'm going to focus on my choices and the primary motivations for each of those choices.

  • Compute: Azure Functions will serve me well here. It meets my need for scale and simplicity while also providing easy configuration of scheduled and triggered jobs and easy integrations with bindings.
  • Queuing: Keeping things simple, I'll use Azure Storage Blobs and separate the files by containers. The unknown but expectedly large size of each initial input file makes storage queues a non-option for initial retrieval, and likely takes them out of the running for processing individual subscription data splits. Beyond that, I'd like to keep the mechanism uniform and I really don't need any advanced capabilities, such as priority messages, routing, message-specific security and poisoned message handling.
  • Storage: Azure Cosmos DB is indeed my friend here. Using the subscription ID as the partition key allows me to limit access by subscription, if necessary. Additionally, the ease of adding and removing read and read-write geographically distributed replicas and native support in Power BI makes this a no-brainer for my system. Last, I have to admit a little personal bias: I want a proper document storage mechanism that supports the SQL syntax I've used for too many years to abandon.

Figure 2 represents the application of technology to the logical architecture, as well as adding some processing flow to it.

Technology Map and Data Flow
Figure 2 Technology Map and Data Flow

I've taken the liberty of including the names I used in this diagram, but you might not have names at this stage of the design. The shapes used indicate the technology in play; the numbers on the line are the sequence in which the process is executed, and the arrows indicate which component initiates the outbound call. Note that I've identified four Azure Functions, four Azure Storage Blob Containers and three Azure Cosmos DB collections that I'll employ as the working pieces of my implementation.

Separating the data into three collections is useful for explaining, but serves a grander purpose. I won't need the same security for each of the types of documents and the separation makes that easy to understand and manage. More important, I define the performance characteristics by collection and the separation allows me to more easily optimize that by having a large high-throughput collection specifically for the DetailedUsageData, while the other two remain minimal.

Retrieving Data

Starting with the first two legs of the data journey, I want to run something similar to what I do with a Cron job. While the WebJobs SDK itself would support this type of implementation, it would leave a lot of work of configuring the runtime environment to me and increase my overall development effort. Because Azure Functions is built on top of the WebJobs SDK and naturally supports Timer Trigger, it's an easy choice. I could've used Azure Data Factory because it's a tool made specifically for moving data around and it supports retrieving Web data and working with Blobs. However, that would mean I'd need to work out certain things with regard to reference data and updating duplicate records in Azure Cosmos DB when I don't have the row ID. Familiarity with development and debugging using Azure Functions, and the information I can get from Azure Functions integration with Application Insights, makes Azure Functions my preferred choice in this instance.

The Timer Trigger has an obvious function, but in order for DailyEABatchControl to know what to process, it retrieves configuration information from the Enrollments collection, which has the following schema:

{
  "enrollmentNumber": "<enrollment number>",
  "description": "",
  "accessKey": "<access key>",
  "detailedEnabled": "true",
  "summaryEnabled": "false",
}

For now, having the enrollment number, access key and a flag to turn on processing ("detailedEnabled") is sufficient for me to do work. However, should I start adding capabilities and need additional run configuration information, Azure Cosmos DB will allow me to easily add elements to the document schema without having to do a bunch of reworking and data migration. Once the DailyEABatchControl is triggered, it will loop through all of the documents and call RetrieveUsage for each enrollment that has "detailedEnabled" set to true, separating the logic to start a job from the logic to retrieve the source data. I use the JobLog collection to determine if a job has already been run for the day, as shown in Figure 3.

Figure 3 Job Control Logic

// Get list of enrollments for daily processing
List<Enrollment> enrollments = 
  inputDocument.CreateDocumentQuery<Enrollment>(
  UriFactory.CreateDocumentCollectionUri(dbName, enrollmentCollection), 
  new SqlQuerySpec("SELECT * FROM c WHERE c.detailedEnabled = 'true'"),
  queryOptions).ToList<Enrollment>();

// Get yesterday's date to make sure there are logs for today
int comparisonEpoch = 
  (int)(DateTime.UtcNow.AddDays(-1) - new DateTime(1970, 1, 1)).TotalSeconds;

string logQuery = 
  "SELECT * FROM c WHERE c.epoch > '" + comparisonEpoch.ToString() + "'";

List<JobLog> logs = inputDocument.CreateDocumentQuery<JobLog>(
  UriFactory.CreateDocumentCollectionUri(dbName, jobLogCollection), 
  new SqlQuerySpec(logQuery), queryOptions).ToList<JobLog>();

// Get list of enrollments for which there is no match
var jobList = enrollments.Where(x =>
  !logs.Any (l => l.enrollmentNumber == x.enrollmentNumber));

The last lamba results in a filtered list of enrollments for which data hasn't been retrieved for the day in question. Next, I'll call the RetrieveUsage (step 3 in Figure 2) from within DailyEABatch­Control by calling it with HTTPClient with sufficient data in the post body for it to know the enrollment for which it's fetching data and the month for which it's fetching it, as shown in Figure 4.

Figure 4 Retrieving Usage Data

foreach(var doc in jobList)
{
  HttpClient httpClient = new HttpClient();

  string retrieveUsageUri = @"https://" + 
    System.Environment.GetEnvironmentVariable("retrieveUsageUri");

  string postBody = "{\"enrollment\":\"" + doc.enrollmentNumber + "\"," +
    "\"month\":\"" + DateTime.Now.ToString("yyyy-MM") + "\"}";

  httpClient.DefaultRequestHeaders.Accept.Add(
    new MediaTypeWithQualityHeaderValue("application/json"));

  var content = new StringContent(postBody, Encoding.UTF8, "application/json");
  var response = await httpClient.PostAsync(theUri, content);    

  response.EnsureSuccessStatusCode();
 
  string fetchResult = await response.Content.ReadAsStringAsync();
}

It's worth pointing out that this isn't intended to be an open system. I'm creating a closed processing loop so I don't want just any caller executing the RetrieveUsage Function. Thus, I've secured it by requiring a code that's not shown in Figure 4, but is part of the URI returned from GetEnvironmentVariable("retrieveUsageUri"). In an enterprise implementation, a service principal and Azure Active Directory integration would be a more realistic choice to achieve a higher degree of security.

The last step of the first leg of my data's journey is within the RetrieveUsage function, where it's persisted to the newdailyusage container with Azure Blob Storage. However, in order to get that data I have to construct the call and include the accessKey as a bearer token in the header:

HttpClient httpClient = new HttpClient();

string retrieveUsageUri = usageQB.FullEAReportUrl();

httpClient.DefaultRequestHeaders.Add("authorization", bearerTokenHeader);
httpClient.DefaultRequestHeaders.Add("api-version", "1.0");

var response = await httpClient.GetAsync(retrieveUsageUri);    

response.EnsureSuccessStatusCode();

string responseText = await response.Content.ReadAsStringAsync();

For the sake of brevity, I've cut some date manipulations out of this code block and haven't included a helper class for generating the bearerTokenHeader or the UsageReportQueryBuilder. However, this should be sufficient to illustrate how they're used and ordered. The accessKey is passed into the static method FromJwt, which will return the BearerToken type, from which I simply grab the header and add it to the request that's created from the URL constructed by the call to usageQB.FullEAReportUrl. Last, I update the output binding to the path and filename I want for the Blob target:

path = "newdailyusage/" + workingDate.ToString("yyyyMMdd") 
  + "-" +  data.enrollment + "-usage.json";
var attributes = new Attribute[]
{
  new BlobAttribute(path),
  new StorageAccountAttribute("eabillingstorage_STORAGE")
};

using (var writer = await binder.BindAsync<TextWriter>(attributes))
{
  writer.Write(responseText);
}

This will result in a structure in Azure Storage that looks like this:

newdailyusage/
        20170508-1234-usage.json
        20170508-456-usage.json
        20170507-123-usage.json

This allows me to store data multiple enrollments and multiple files for each enrollment in case processing doesn't happen for some reason. Additionally, because data can change for previous days as the month progresses, it's important to have the files available for research and reconciliation in case anomalies show up in the report data.

Splitting Data for Parallel Processing

With so much data coming in and the work of somehow updating records for a given month of processing each day, it's important to process this data in a parallel fashion. Usually, at least nowadays, this is when I break out the parallel libraries for C#, write a few lines of code and pat myself on the back for being a genius at parallel processing. However, in this instance, I'd really like to just rely on the capabilities of the platform to do that for me and allow me to focus on each discrete task.

The next Azure Function in the sequence has been configured with a blob trigger so it will pick up files that land in the inbound processing storage container. The job at this step is to split the inbound file into a file-per-day per enrollment. All in all, this is a pretty simple step, but it does require deserializing the JSON file into RAM. It's important to note this, because the method I've chosen to use for the prototype simply calls the deserialize method:

JsonConvert.DeserializeObject<List<EAUsageDetail>>(myBlob);

I know this to be sufficient for my purposes, but the present RAM allocation for the Azure Function host is 1.5GB. It's possible that, for a large enrollment with substantial resources provisioned, a file would become too big at some point in the month to load into RAM, in which case an alternate method for parsing and splitting the file will have to be used. Moreover, if you create an Azure Function that takes more than five minutes to run, it will have to be modified because the current default is five minutes, though this can be adjusted to a max of 10 minutes via the host configuration JSON. As I mentioned early on, knowing the volume of data will be key at each point and for integration in the overall system. Once the data has been deserialized, I'll grab the max day out of it and set up a loop from day one to day max to start selecting out the data for each of those days, as shown in Figure 5.

Figure 5 Selecting Each Day’s Data

// Loop through collection filtering by day
for(int dayToProcess = 1; dayToProcess <= maxDayOfMonth; dayToProcess++)    
{
  // Get documents for current processing day
  var docsForCurrentDay = results.Where (d => d.Day==dayToProcess);

  // Serialize to string
  string jsonForCurrentDay = 
    JsonConvert.SerializeObject(docsForCurrentDay);
  log.Info($"***** Docs for day {dayToProcess} *****");

  // Get date for one of the records for today
  string processDateString = (from docs in results where docs.Day == 
    dayToProcess select docs.Date).First();

  path = "newdailysplit/" + DateTime.Parse(processDateString).ToString("yyyyMMdd") 
    + "-" +  enrollment + "-dailysplit.json";

  // Write out each day's data to file in container "\newdailysplit"
  var attributes = new Attribute[]
  {
    new BlobAttribute(path),
    new StorageAccountAttribute("eabillingstorage_STORAGE")
  };

  using (var writer = await binder.BindAsync<TextWriter>(attributes))
  {
    writer.Write(jsonForCurrentDay);
  }
}

Once all the days have been split into separate files and written out (see step 7 in Figure 2), I simply move the file to the processed­usage container. To keep the diagram in Figure 2 easy to parse, I've omitted some containers—in particular, the error files container is missing from the diagram. This is the container that holds any file that causes an exception during processing, whether that file is the entire usage file or just one of the daily splits. I don't spend time or effort correcting the data for missing or errored days because, once an issue is identified, the process can be triggered for a given month and enrollment or for a single daily split to correct the problem. Also clearly missing from the prototype are alerting and compensating mechanisms for when errors occur, but that's something I want to bubble up through Application Insights integration with the Operations Management Suite.

Persisting the Data to Azure Cosmos DB

With the files split and ready to be picked up by the ProcessDaily­Usage Function, it's time to consider some issues that need to be addressed, namely throughput to the target and how to handle updates. Often when working through some solution architecture in an enterprise, you run into older systems that are less capable, or where real-time loads and high-throughput scenarios need to be managed. I don't naturally have any hard throughput constraints in my cloud native setup for this architecture, but I could create problems for myself if I don't take the time to think through the volume and speed of the data I'm feeding into the cloud services I'm consuming.

For my data set, each of the daily splits is about 2.4MB and contains about 1,200 individual documents. Keep in mind that each document represents one meter reading for one resource provisioned in Azure. Thus, for each EA the number of documents in a daily split could vary greatly depending on resource usage across the enterprise. The ProcessDailyUsage Function is configured to trigger based on receiving new blobs in the newdailysplit container. This means I'll have as many as 31 concurrent Function executions manipulating the data. To help me estimate what I need to provision for Azure Cosmos DB, I used the calculator at documentdb.com/­capacityplanner. Without some empirical testing I had to make a few guesses for the initial provisioning. I know there will be 31 concurrent executions, but it's a little harder to nail down how many concurrent requests per second that will create without doing repetitive runs. The end result of this prototype will help to inform the final architecture and requirements for provisioning, but because I'm working forward on this timeline, I'm going to take a stab at it using the following as my rules for estimating:

  • 1,200 records
  • 31 concurrent executions (for a single EA)
  • 0.124 seconds per request (empirical evidence from measuring a few individual requests)

I'll round down to 0.1 seconds for a more conservative estimate, thus overestimating the load. This nets 310 requests per second per EA, which in turn comes out to about 7,800 request units (RUs) based on the calculator results, as can be seen in Figure 6.

Azure Cosmos DB Pricing Calculator
Figure 6 Azure Cosmos DB Pricing Calculator

Because the maximum RUs that can be provisioned without calling support is 10,000, this might seem kind of high. However, I'm running an unthrottled parallel process and that drives up the throughput significantly, which in turn will drive up the cost. This is a major consideration when designing the structure because it's fine for me to run this for some testing, but for the real solution I'll need a throttling mechanism to slow down the processing so I can provision fewer RUs and save myself a little money. I don't need the data to be captured as fast as possible, just within a reasonable enough time that someone could review and consume it on a daily basis. The good news is that the Azure Functions team has a concurrency control mechanism in the backlog of issues that will eventually get resolved (bit.ly/2tcpAbI), and will provide a good means of control once implemented. Some other options are to introduce artificial arbitrary delays (let's all agree this is bad) or to rework the processing and handle the parallel execution explicitly in the C# code. Also, as technical expert Fabio Cavalcante pointed out in a conversation, another good option would be to modify the architecture a bit by adding Azure Storage Queues and using features such as visibility timeouts and scheduled delivery to act as a throttling mechanism. That would add a few moving parts to the system and I'd have to work out the interaction of using a queue for activation while keeping the data in storage, or slice up the data in 64KB blocks for the queue. Once throttling is available in Azure Functions, I'll be able to keep it in this simpler form with which I'm working. The salient point here is that when working with a serverless architecture you must be familiar with the constraints of the platforms on which you're building, as well as the cost of each decision.

When provisioning more than 2,500 RUs, the system requires that a partition key be specified. This works for me, because I want to partition that data in any case to help with both scale and security in the future.

As you can see in Figure 7, I've specified 8,000 RUs, which is a little more than the calculation indicated, and I've specified SubscriptionId as the partition key.

Provisioning a New Collection
Figure 7 Provisioning a New Collection

Additionally, I set up the ProcessDailyUsage with a blob trigger on the newdailysplit container and with an input and output binding for Azure Cosmos DB. The input binding is used to find the records that exist for the given day and enrollment and to handle duplicates. I'll ensure that my FeedOptions sets the cross-partition query flag, as shown in Figure 8.

Figure 8 Using FeedOptions to Set the Cross-Partition Query Flag

string docsToDeleteQuery = String.Format(@"SELECT * FROM c where c.Enrollment = 
  ""{0}"" AND c.Date = ""{1}""", enrollment, incomingDataDate); 

FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, 
  EnableCrossPartitionQuery = true };

IQueryable<Document> deleteQuery = docDBClient.CreateDocumentQuery<Document>(
  UriFactory.CreateDocumentCollectionUri(dbName, collectionName), 
  new SqlQuerySpec(docsToDeleteQuery), queryOptions);

log.Info("Delete documents");
int deletedDocumentCount = 0;
foreach (Document doc in deleteQuery)
{ 
  await docDBClient.DeleteDocumentAsync(((dynamic)doc)._self,  
    new RequestOptions { PartitionKey = 
    new PartitionKey(((dynamic)doc).SubscriptionId) });
  deletedDocumentCount++;
}

I create a query to grab all the records for the enrollment on that date and then loop through and delete them. This is one instance where SQL Azure could've made things easier by issuing a DELETE query or by using an upsert with a known primary key. However, in Azure Cosmos DB, to do the upsert I need the row ID, which means I must make the round trip and do the comparison on fields I know to uniquely identify the document and then use that row's id or selflink. For this example, I simply delete all the records and then add the new—and potentially updated—documents back in. To do this I need to pass in the partition key to the Delete­DocumentAsync method. An optimization would be to pull the documents back and do a local comparison, update any changed documents and add net new documents. It's a little taxing, because all of the elements in each document must be compared. Because there's no primary key defined for the billing documents, you can likely find the matched document using SubscriptionId, MeterId, InstanceId and Date and compare the rest of the elements from there. This would offload some of the work from Azure Cosmos DB and reduce the overall traffic.

With the way cleared to add the documents back into the collection, I simply loop through the docs and call AddAsync on the documentCollector I defined as the output binding for the Azure Function:

// Update the enrollment field in the incomming collection
incomingDailyUsage.ForEach (usage => usage.Enrollment = enrollment); 

int processedRecordCount=0;
foreach (EnrollmentUsageDetail usageDoc in incomingDailyUsage)
{

  await documentCollector.AddAsync(usageDoc);
  processedRecordCount++;
}

While it's not much of a change, I've also done a little bit of enrichment by adding the Enrollment number to each document in the collection. Running one daily split file produces the log information shown in Figure 9.

Figure 9 The Log Information from a Daily Split File

2017-06-10T01:16:55.291 Function started (Id=bfb220aa-97ab-4d36-9c1e-602763b93ff0)
2017-06-10T01:16:56.041 First 15 chars: [{"AccountOwner
2017-06-10T01:16:56.181 get date
2017-06-10T01:16:56.181 getting enrollment
2017-06-10T01:16:56.181 Incoming date: 11/01/2016 for Enrollment: 4944727
2017-06-10T01:16:56.181 Collection: partitionedusage
2017-06-10T01:16:56.181 query:  SELECT * FROM c where c.Enrollment = "4944727" AND c.Date = "11/01/2016"
2017-06-10T01:16:56.181 Create delete query
2017-06-10T01:16:56.197 Delete documents
2017-06-10T01:17:23.189 2142 docs deleted while processing 20161101-4944727-dailysplit.json
2017-06-10T01:17:23.189 Import documents
2017-06-10T01:17:44.628 2142 records imported from file 20161101-4944727-dailysplit.json
2017-06-10T01:17:44.628 Moving file 20161101-4944727-dailysplit.json to /processedusage container
2017-06-10T01:17:44.674 Deleting 20161101-4944727-dailysplit.json
2017-06-10T01:17:44.690 Completed!
2017-06-10T01:17:44.690 Function completed (Success, Id=bfb220aa-97ab-4d36-9c1e-602763b93ff0, Duration=49397ms)

Final Note

The only thing left to do is to run a good many iterations with varying inputs and then measure so I can properly size the services I'm using. This includes testing out the geographic replication capabilities and some further prototyping of the security that I'll want to implement around subscription data access; these were two of the major reasons for choosing Azure Cosmos DB. The net lessons to be gleaned are some of the ones that we seem to keep learning in the world of IT:

  1. There are no magic bullets, not even with a serverless architecture.
  2. Nothing replaces thorough testing.
  3. Size your dependent services and treat this as seriously as you did when sizing your hardware in the past.
  4. Pay close attention to cost, especially under high throughput conditions.

The upside of using serverless compute like Azure Functions is that you pay only for what's consumed. For regular but infrequent processing such as this, that can be a big benefit in cost savings. Finally, configuring capabilities is a better experience and allows faster time to product than configuring host servers.


Joseph Fultz  is a cloud solution architect at Microsoft. He works with Microsoft customers developing architectures for solving business problems leveraging Microsoft Azure. Formerly, Fultz was responsible for the development and architecture of GM’s car-sharing program (mavendrive.com). Contact him on Twitter: @JosephRFultz or via e-mail at jofultz@microsoft.com.

Thanks to the following Microsoft technical expert who reviewed this article: Fabio Calvacante