Was this page helpful?
Your feedback about this content is important. Let us know what you think.
Additional feedback?
1500 characters remaining
Export (0) Print
Expand All

How to: Create an Agent for Batch Processing

Visual Studio 2010

Applies to: Functional Programming

Published: January 2010

Authors: Tomas Petricek and Jon Skeet

Referenced Image

Get this book in Print, PDF, ePub and Kindle at manning.com. Use code “MSDN37b” to save 37%.

Summary: When receiving messages, the application may put the messages into groups of some maximum size and process them in batches. This article develops an agent that implements this behavior.

This topic contains the following sections.

This article is associated with Real World Functional Programming: With Examples in F# and C# by Tomas Petricek with Jon Skeet from Manning Publications (ISBN 9781933988924, copyright Manning Publications 2009, all rights reserved). No part of these chapters may be reproduced, stored in a retrieval system, or transmitted in any form or by any means—electronic, electrostatic, mechanical, photocopying, recording, or otherwise—without the prior written permission of the publisher, except in the case of brief quotations embodied in critical articles or reviews.

When writing a system that receives a large number of messages at a very high speed, it may be quite inefficient to process messages one by one, as they arrive. A better option is to receive messages in groups and then process an entire group at once. For example, when inserting data into an SQL database, multiple rows can be inserted at once using bulk insert, which is significantly more efficient than inserting individual rows.

Batch processing is a very common requirement. Instead of implementing it over and over again, the behavior can be encapsulated inside an agent and then reused. This article shows how to implement a BatchProcessingAgent that can be used to add batch processing to concurrent systems in F#. The agent takes a maximum allowed size of s batch and a maximum timeout between batches and behaves as follows:

  • When it receives the allowed number of messages before the specified time, it reports a new bulk, resets the timer, and starts waiting for new messages.

  • When the timeout elapses, the agent produces a group of messages received so far (if there are any) and starts waiting again.

The agent can be used in various ways. When the maximum number of messages is very high, the agent just groups the messages received within a specified timeout. When the timeout is large, the agent always waits until it receives a specified number of messages. In practice, it is useful to combine the two options to get reasonably sized groups but also process all messages reasonably quickly.

Figure 1. Grouping messages with the maximum group size of three and a timeout

Referenced Image

Figure 1 demonstrates the behavior of the agent. The circles represent messages received, and the lines below denote groups. In the first case, the agent receives three messages within the specified time limit, so it creates a group of three. In the second case, only two messages are received before the timeout, so the second group consists of just two values. Next, the agent receives three messages very quickly, so it creates a new group much sooner.

The agent implementation is quite simple but it is split into two sections to make the code more readable. Complete source code can be also downloaded at the end of the article. The following section starts by looking at the declaration of the batch processing agent type. The body of the agent is added later.

Creating the Agent Type

Consider the members that the batch processing agent needs to provide. When created, the agent is given the maximum size of a batch and the maximum timeout between the batches. These two numbers will be the parameters of the agent's constructor.

The agent also needs a member for sending messages, which is called Enqueue. When a batch is generated, the agent reports it back to the caller using an event called BatchProduced. The following snippet shows the structure of the BatchProcessingAgent<'T> type:

/// Agent that groups received messages in 
/// groups with a specified maximum size
type BatchProcessingAgent<'T> (batchSize, timeout) = 
    let batchEvent = new Event<'T[]>()
    let agent : Agent<'T> = Agent.Start(fun agent -> 
        // NOTE: Implementation details omitted
        let rec loop remainingTime messages = (...)
        loop timeout [] )

    /// Triggered when the agent collects a group of messages
    member x.BatchProduced = batchEvent.Publish
    /// Send new message to the agent
    member x.Enqueue(v) = agent.Post(v)

The type follows the usual pattern for creating agent-based types. It creates a new instance of agent in the constructor. The constructor of the agent also creates a new event of the Event<'T[]> type, which is used to report batches of messages to the clients of the type. When triggered, it will carry a (non-empty) array of accumulated messages.

The agent has two public members. The Enqueue method is a simple proxy that sends messages to the agent. The BatchProduced member exposes the event that reports batches. The Publish member of the event returns a value of the IEvent<'T[]> type, representing an event that clients can register with. The next section uses the Trigger method of the event to send batches to all registered clients. This is done in the body of the agent using the rules discussed in the introduction.

Implementing the Agent Body

The body of the bulking agent can be implemented quite easily using the TryReceive method of the Agent<'T> type. The method takes a timeout and resumes the asynchronous workflow as soon as a message is received or after the specified timeout. The method returns an option value that can be processed using pattern matching. Pattern matching makes it easy to recognize all possible cases:

  • When the agent collects a batch of messages of maximum length

  • When it receives a message but has still room for more messages

  • When the timeout elapses but the agent has some pending messages

  • When the timeout elapses and the agent doesn't have any pending messages

The implementation of the agent directly encodes the four cases above. In reaction to the messages, it may trigger the event and then it continues waiting for more messages:

let rec loop remainingTime messages = async {
    let start = DateTime.Now
    let! msg = agent.TryReceive(timeout = max 0 remainingTime)
    let elapsed = int (DateTime.Now - start).TotalMilliseconds
    match msg with 
    | Some(msg) when List.length messages = bulkSize - 1 ->
        batchEvent.Trigger(msg :: messages |> List.rev |> Array.ofList)
        return! loop timeout []
    | Some(msg) ->
        return! loop (remainingTime - elapsed) (msg::messages)
    | None when List.length messages <> 0 -> 
        batchEvent.Trigger(messages |> List.rev |> Array.ofList)
        return! loop timeout []
    | None -> 
        return! loop timeout [] }

The agent keeps two pieces of state when looping. The parameter remainingTime stores the time until a batch needs to be returned to the client, and messages is a list of messages collected so far. For example, when the timeout is 10 seconds and the agent receives a message after 7 seconds of waiting, the messages list will contain a single element, and remainingTime will be 3 seconds.

The TryReceive method is called with the remaining time as an argument. A negative value would mean that the method should wait forever, so the snippet ensures that the remaining time is positive. After receiving a message, the snippet handles the four cases described above. When the last message in a batch is received, the agent sends the batch to the client by calling bulkEvent.Trigger. If the batch length is still smaller, the agent subtracts the time spent while waiting from the remaining time and continues looping. A received message None signifies that the timeout has elapsed. If there are any messages, the agent sends them to the client and then continues waiting for the next batch.

This snippet completed the implementation of the agent. The next section demonstrates how to use the agent for batch processing of user interface events.

The batch processing agent can be useful in a wide range of applications. It can be used in servers or other applications that need to store data in a database or in concurrent systems that receive updates via messages (for example, updates from a social network or messages in trading). This section looks at an example based on a user interface, which is the easiest way to demonstrate the agent.

The example creates a new form that handles KeyPress messages from the system and displays the entered text with a delay. After pressing five keys, the text is updated immediately. When the user presses a smaller number of keys, the form is updated after a 5-second timeout. Behavior like this can be useful when sending requests to the server. The application needs to send updates when there are significant changes, but it shouldn't flood the server.

// Initialize application user interface
open System 
open System.Drawing
open System.Windows.Forms

let frm = new Form()
let lbl = new Label(Dock = DockStyle.Fill)

// Create agent for bulking KeyPress events
let ag = new BatchProcessingAgent<_>(5, 5000)
frm.KeyPress.Add(fun e -> ag.Enqueue(e.KeyChar))
    |> Event.map (fun chars -> new String(chars))
    |> Event.scan (+) ""
    |> Event.add (fun str -> lbl.Text <- str)

The user interface for the example is just a single form with a label for displaying the entered text. The interesting part is the last few lines of code. The snippet first creates a new BatchProcessingAgent and specifies 5 as the batch size and 5000 ms as the timeout. Then it adds a KeyPress handler that sends typed characters to the agent.

The handler for BatchProduced event uses F# functions for event processing. The Event.map function turns the original event that carries arrays of chars into an event that carries strings. The Event.scan function creates an event that concatenates strings and reports the complete text written so far. Finally, Event.add creates a handler that displays the text on the form. When running the program, the text updates either after writing 5 characters or after 5 seconds, whichever happens first.

This article looked at how to implement batch processing of incoming messages using an agent. The agent provides a member Enqueue for adding messages and a BatchProduced event, which reports groups of messages to the user of the agent. This makes the agent very easy to integrate into various types of applications, ranging from web servers and trading systems to user interfaces. The type is implemented as an agent, which means that it can be safely accessed from multiple threads.

This article discussed how to implement an agent that is useful for implementing batch processing, which is a very common pattern in concurrent systems. For more information about agents and server-side programming in F#, see the following articles:

The following How to article gives another example of a reusable agent:

To download the code snippets shown in this article, go to http://code.msdn.microsoft.com/Chapter-2-Concurrent-645370c3

This article is based on Real World Functional Programming: With Examples in F# and C#. Book chapters related to the content of this article are:

  • Book Chapter 13: “Asynchronous and data-driven programming” explains how asynchronous workflows work and uses them to write an interactive script that downloads a large dataset from the Internet.

  • Book Chapter 14: “Writing parallel functional programs” explains how to use the Task Parallel Library to write data-parallel and task-based parallel programs. This approach complements agent-based parallelism in F#.

  • Book Chapter 16: “Developing reactive functional programs” discusses how to write reactive user interfaces using asynchronous workflows and events. This approach is related to agents but more suitable for creating user interfaces.

The following MSDN documents are related to the topic of this article:

© 2015 Microsoft