Transacted Batching

This sample demonstrates how to batch transacted reads by using Message Queuing (MSMQ). Transacted Batching is a performance optimization feature for transacted reads in queued communication.

Note

The setup procedure and build instructions for this sample are located at the end of this topic.

In queued communication, the client communicates to the service using a queue. More precisely, the client sends messages to a queue. The service receives messages from the queue. The service and client therefore, do not have to be running at the same time to communicate using a queue.

This sample demonstrates transacted batching. Transacted batching is a behavior that enables the use of a single transaction when reading many messages in the queue and processing them.

To set up, build, and run the sample

  1. Ensure that you have performed the One-Time Setup Procedure for the Windows Communication Foundation Samples.

  2. If the service is run first, it will check to ensure that the queue is present. If the queue is not present, the service will create one. You can run the service first to create the queue, or you can create one via the MSMQ Queue Manager. Follow these steps to create a queue in Windows 2008.

    1. Open Server Manager in Visual Studio 2010.

    2. Expand the Features tab.

    3. Right-click Private Message Queues, and select New, Private Queue.

    4. Check the Transactional box.

    5. Enter ServiceModelSamplesTransacted as the name of the new queue.

    Note

    In this sample the client sends hundreds of messages as part of the batch. It is normal for the service application to take some time to process these.

  3. To build the C# or Visual Basic .NET edition of the solution, follow the instructions in Building the Windows Communication Foundation Samples.

  4. To run the sample in a single- or cross-computer configuration, follow the instructions in Running the Windows Communication Foundation Samples.

To run the sample on a computer joined to a workgroup or without active directory integration

  1. By default with the NetMsmqBinding, transport security is enabled. There are two relevant properties for MSMQ transport security, MsmqAuthenticationMode and MsmqProtectionLevel. By default, the authentication mode is set to Windows and the protection level is set to Sign. For MSMQ to provide the authentication and signing feature, it must be part of a domain and the active directory integration option for MSMQ must be installed. If you run this sample on a computer that does not satisfy these criteria you receive an error.

  2. If your computer is not part of a domain or does not have active directory integration installed, turn off transport security by setting the authentication mode and protection level to None as shown in the following sample configuration:

    <system.serviceModel>
      <behaviors>
        <serviceBehaviors>
          <behavior name="ThrottlingBehavior">
            <serviceMetadata httpGetEnabled="true"/>
            <serviceThrottling maxConcurrentCalls="5"/>
          </behavior>
        </serviceBehaviors>
    
        <endpointBehaviors>
          <behavior name="BatchingBehavior">
            <transactedBatching maxBatchSize="100"/>
          </behavior>
        </endpointBehaviors>
      </behaviors>
      <services>
        <service 
            behaviorConfiguration="ThrottlingBehavior" 
            name="Microsoft.ServiceModel.Samples.OrderProcessorService">
          <host>
            <baseAddresses>
              <add baseAddress="https://localhost:8000/orderProcessor/transactedBatchingSample"/>
            </baseAddresses>
          </host>
          <!-- Define NetMsmqEndpoint -->
          <endpoint address="net.msmq://localhost/private/ServiceModelSamplesTransactedBatching"
                    binding="netMsmqBinding"
                    bindingConfiguration="Binding1" 
                    behaviorConfiguration="BatchingBehavior" 
                    contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
          <endpoint address="mex"
                    binding="mexHttpBinding"
                    contract="IMetadataExchange" />
        </service>
      </services>
    
      <bindings>
        <netMsmqBinding>
          <binding name="Binding1">
            <security mode="None" />
          </binding>
        </netMsmqBinding>
      </bindings>
    
    </system.serviceModel>
    
  3. Ensure that you change the configuration on both the server and the client before you run the sample.

    Note

    Setting securitymode to None is equivalent to setting MsmqAuthenticationMode, MsmqProtectionLevel, and Message security to None.

  4. To run the database on a remote computer, change the connection string to point to the computer on which the database resides.

Requirements

To run this sample, MSMQ must be installed and SQL or SQL Express is required.

Demonstrates

The sample demonstrates transacted batching behavior. Transacted batching is a performance optimization feature provided with MSMQ queued transport.

When transactions are used to send and receive messages there are actually 2 separate transactions. When the client sends messages within the scope of a transaction, the transaction is local to the client and the client queue manager. When the service receives messages within the scope of the transaction, the transaction is local to the service and the receiving queue manager. It is very important to remember that the client and the service are not participating in the same transaction; rather they are using different transactions when performing their operations (such as send and receive) with the queue.

In the sample we use a single transaction for the execution of multiple service operations. This is used only as a performance optimization feature and does not impact the semantics of the application. The sample is based on Transacted MSMQ Binding.

Comments

In this sample, the client sends a batch of messages to the service from within the scope of a transaction. To show the performance optimization, we send a large number of messages; in this case, up to 2500 messages.

The messages sent to the queue are then received by the service within the transaction scope defined by the service. Without batching, this results in 2500 transactions for each invocation of the service operation. This impacts performance of the system. Because two resource managers are involved -the MSMQ queue and the Orders database- each such transaction is a DTC transaction. We optimize this by using a much smaller number of transactions by ensuring that a batch of messages and service operation invocations happen in a single transaction.

We use the batching feature by:

  • Specifying transacted batching behavior in configuration.

  • Specifying a batch size in terms of number of messages to be read using a single transaction.

  • Specifying the maximum number of concurrent batches to run.

In this example, we show performance gains by reducing the number of transactions by ensuring that 100 service operations are invoked in a single transaction before committing the transaction.

The service behavior defines an operation behavior with TransactionScopeRequired set to true. This ensures that the same transaction scope that is used to retrieve the message from the queue is used by any resource managers accessed by the method. In this example, we use a basic database to store the purchase order information contained in the message. The transaction scope also guarantees that if the method throws an exception, the message is returned to the queue. Without setting this operation behavior, a queued channel creates a transaction to read the message from the queue and commits it automatically before it is dispatched so that if the operation fails, the message is lost. The most common scenario is for service operations to enlist in the transaction that is used to read the message from the queue as demonstrated in the following code.

Note that ReleaseServiceInstanceOnTransactionComplete is set to false. This is an important requirement for batching. The property ReleaseServiceInstanceOnTransactionComplete on ServiceBehaviorAttribute indicates what to do with the service instance once the transaction is completed. By default, the service instance is released upon completing the transaction. The core aspect to batching is the use of a single transaction for reading and dispatching many messages in the queue. Therefore releasing the service instance ends up completing the transaction prematurely negating the very use of batching. If this property is set to true and transacted batching behavior is added to the endpoint, the batching validation behavior throws an exception.

// Service class that implements the service contract.
// Added code to write output to the console window.
[ServiceBehavior(ReleaseServiceInstanceOnTransactionComplete=false, 
TransactionIsolationLevel=
System.Transactions.IsolationLevel.Serializable, ConcurrencyMode=ConcurrencyMode.Multiple)]
public class OrderProcessorService : IOrderProcessor
{
    [OperationBehavior(TransactionScopeRequired = true,
                       TransactionAutoComplete = true)]
    public void SubmitPurchaseOrder(PurchaseOrder po)
    {
        Orders.Add(po);
        Console.WriteLine("Processing {0} ", po);
    }
    …
}

The Orders class encapsulates the processing of the order. In the sample, it updates the database with purchase order information.

// Order Processing Logic
public class Orders
{
    public static void Add(PurchaseOrder po)
    {
        // Insert purchase order.
        SqlCommand insertPurchaseOrderCommand = 
        new SqlCommand(
        "insert into PurchaseOrders(poNumber, customerId) 
                               values(@poNumber, @customerId)");
        insertPurchaseOrderCommand.Parameters.Add("@poNumber", 
                                           SqlDbType.VarChar, 50);
        insertPurchaseOrderCommand.Parameters.Add("@customerId", 
                                         SqlDbType.VarChar, 50);

        // Insert product line item.
        SqlCommand insertProductLineItemCommand = 
             new SqlCommand("insert into ProductLineItems(productId, 
                    unitCost, quantity, poNumber) values(@productId, 
                    @unitCost, @quantity, @poNumber)");
        insertProductLineItemCommand.Parameters.Add("@productId", 
                                           SqlDbType.VarChar, 50);
        insertProductLineItemCommand.Parameters.Add("@unitCost", 
                                                  SqlDbType.Float);
        insertProductLineItemCommand.Parameters.Add("@quantity", 
                                                     SqlDbType.Int);
        insertProductLineItemCommand.Parameters.Add("@poNumber", 
                                           SqlDbType.VarChar, 50);
        int rowsAffected = 0;
        using (TransactionScope scope = 
              new TransactionScope(TransactionScopeOption.Required))
        {
             using (SqlConnection conn = new 
                 SqlConnection(
                 ConfigurationManager.AppSettings["connectionString"]))
             {
                 conn.Open();

                // Insert into purchase order table.
               insertPurchaseOrderCommand.Connection = conn;
               insertPurchaseOrderCommand.Parameters["@poNumber"].Value 
                                                       = po.PONumber;
             insertPurchaseOrderCommand.Parameters["@customerId"].Value 
                                                    =po.CustomerId;
             insertPurchaseOrderCommand.ExecuteNonQuery();

            // Insert into product line item table.
            insertProductLineItemCommand.Connection = conn;
            foreach (PurchaseOrderLineItem orderLineItem in 
                                        po.orderLineItems) {
            insertProductLineItemCommand.Parameters["@poNumber"].Value 
                                                          =po.PONumber;
            insertProductLineItemCommand.Parameters["@productId"].Value 
                                             = orderLineItem.ProductId;
            insertProductLineItemCommand.Parameters["@unitCost"].Value 
                                             = orderLineItem.UnitCost;
            insertProductLineItemCommand.Parameters["@quantity"].Value 
                                             = orderLineItem.Quantity;
            rowsAffected += 
            insertProductLineItemCommand.ExecuteNonQuery();
            }
            scope.Complete();
        }
     }
     Console.WriteLine(
     "Updated database with {0} product line items  for purchase order 
                                     {1} ", rowsAffected, po.PONumber);
    }
}

The batching behavior and its configuration are specified in the service application configuration.

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <!-- Use appSetting to configure MSMQ queue name. -->
    <add key="queueName" 
     value=".\private$\ServiceModelSamplesTransactedBatching" />
    <add key="baseAddress" 
     value=
     "https://localhost:8000/orderProcessor/transactedBatchingSample"/>
    <add key="connectionString" value="Data Source=.\SQLEXPRESS;AttachDbFilename=|DataDirectory|orders.mdf;Integrated Security=True;User Instance=True;" />
  </appSettings>
  
  <system.serviceModel>
    <behaviors>
      <serviceBehaviors>
        <behavior name="ThrottlingBehavior">
          <serviceThrottling maxConcurrentCalls="5"/>
        </behavior>
      </serviceBehaviors>
      
      <endpointBehaviors>
        <behavior name="BatchingBehavior">
          <transactedBatching maxBatchSize="100"/>
        </behavior>
      </endpointBehaviors>
    </behaviors>
    <services>
      <service 
          behaviorConfiguration="ThrottlingBehavior" 
          name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address=
"net.msmq://localhost/private/ServiceModelSamplesTransactedBatching"
                  binding="netMsmqBinding"
                  behaviorConfiguration="BatchingBehavior" 
                  contract=
                    "Microsoft.ServiceModel.Samples.IOrderProcessor" />
      </service>
    </services>
  </system.serviceModel>
</configuration>
Aa395219.note(en-us,VS.100).gifNote:
The batch size is a hint to the system. For example, if you specify a batch size of 20, then 20 messages would be read and dispatched using a single transaction and then the transaction is committed. But there are cases where the transaction may commit the batch before the batch size is reached.

Associated with every transaction is a timeout that starts ticking once the transaction is created. When this timeout expires the transaction is aborted. It is possible for this timeout to expire even before the batch size is reached. To avoid re-working the batch because of the abort, the TransactedBatchingBehavior checks to see how much time is left on the transaction. If 80% of the transaction timeout is used up, then the transaction is committed.

If there are no more messages in the queue then instead of waiting for the fulfillment of the batch size the TransactedBatchingBehavior commits the transaction.

The choice of the batch size is dependent on your application. If the batch size is too small, you may not get the desired performance. On the other hand if the batch size is too big, it may deteriorate performance. For example, your transaction could live longer and hold locks on your database or your transaction could become dead locked, which could cause the batch to get rolled back and to redo the work.

The client creates a transaction scope. Communication with the queue takes place within the scope of the transaction, causing it to be treated as an atomic unit where all messages are sent to the queue or none of the messages are sent to the queue. The transaction is committed by calling Complete on the transaction scope.

//Client implementation code.
class Client
{
    static void Main()
    {
        Random randomGen = new Random();
        for (int i = 0; i < 2500; i++)
        {
            // Create a client with given client endpoint configuration.
            OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");

            // Create the purchase order.
            PurchaseOrder po = new PurchaseOrder();
            po.CustomerId = "somecustomer" + i + ".com";
            po.PONumber = Guid.NewGuid().ToString();

            PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
            lineItem1.ProductId = "Blue Widget";
            lineItem1.Quantity = randomGen.Next(1, 100);
            lineItem1.UnitCost = (float)randomGen.NextDouble() * 10;

            PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
            lineItem2.ProductId = "Red Widget";
            lineItem2.Quantity = 890;
            lineItem2.UnitCost = 45.89F;

            po.orderLineItems = new PurchaseOrderLineItem[2];
            po.orderLineItems[0] = lineItem1;
            po.orderLineItems[1] = lineItem2;

            //Create a transaction scope.
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                // Make a queued call to submit the purchase order.
                client.SubmitPurchaseOrder(po);
                // Complete the transaction.
                scope.Complete();
            }

            client.Close();
        }
        Console.WriteLine();
        Console.WriteLine("Press <ENTER> to terminate client.");
        Console.ReadLine();
    }
}

When you run the sample, the client and service activities are displayed in both the service and client console windows. You can see the service receive messages from the client. Press ENTER in each console window to shut down the service and client. Note that because queuing is in use, the client and service do not have to be up and running at the same time. You can run the client, shut it down, and then start up the service and it still receives its messages. You can see a rolling output as messages are read in a batch and processed.

The service is ready.
Press <ENTER> to terminate service.

Updated database with 2 product line items for purchase order 493ac832-d216-4e94-b2a5-d7f492fb5e39
Processing Purchase Order: 8b567f5b-0661-4662-aae2-6cef1bd6d278
        Customer: somecustomer849.com
        OrderDetails
               Order LineItem: 80 of Blue Widget @unit price: $9.751623
               Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $41622.23
        Order status: Pending

Updated database with 2 product line items for purchase order 41130b95-4ea8-40a9-91c3-2e129117fcb8
Processing Purchase Order: 5ce2699d-9a31-4cc2-a8c5-64cda614b3c7
        Customer: somecustomer850.com
        OrderDetails
               Order LineItem: 89 of Blue Widget @unit price: $6.369128
               Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $41408.95
        Order status: Pending

Updated database with 2 product line items for purchase order 8b567f5b-0661-4662-aae2-6cef1bd6d278
Processing Purchase Order: ea94486b-7c86-4309-a42d-2f06c00656cd
        Customer: somecustomer851.com
        OrderDetails
             Order LineItem: 47 of Blue Widget @unit price: $0.9391424
             Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $40886.24
        Order status: Pending
Aa395219.Important(en-us,VS.100).gif Note:
The samples may already be installed on your computer. Check for the following (default) directory before continuing.

<InstallDrive>:\WF_WCF_Samples

If this directory does not exist, go to Windows Communication Foundation (WCF) and Windows Workflow Foundation (WF) Samples for .NET Framework 4 to download all Windows Communication Foundation (WCF) and WF samples. This sample is located in the following directory.

<InstallDrive>:\WF_WCF_Samples\WCF\Basic\Binding\Net\MSMQ\Batching