Export (0) Print
Expand All
Expand Minimize

Reliable Messaging with MSMQ and .NET

 

Duncan Mackenzie
Microsoft Developer Network

Revised February 2002

Summary: This article describes how to use recoverable messages, transactions (alone or combined with database transactions), and acknowledgements with MSMQ and the Microsoft .NET Framework. (30 printed pages)

Download Bdadotnet_beta2.msi.

Contents

Introduction
Recoverable Messages
Transactions
   Creating Transactional Queues
   Using Internal (MSMQ) Transactions
   Participating in External (DTC) Transactions
Acknowledgements, Time Outs, and Journaling
   Using Acknowledgements
   Setting Time Outs
   Using Journal Queues
Conclusion

Introduction

In a previous article, Accessing Message Queues, you learned how to use .NET to send and receive messages using Microsoft Message Queuing (MSMQ). The material in that earlier article provides the foundation for this article.

MSMQ is ideal for use in a variety of systems as a means of transferring data, but if you are going to use it as part of your system, it is critical that you understand how to make it reliable. If you use MSMQ to send a customer's order from your front-end Web application to your back-end order processing system, you want to make sure the order does not get lost along the way. MSMQ can be configured and used so that it becomes a very reliable system, but increasing reliability comes at the expense of decreased performance, so you have to determine the best mix for your specific application.

Recoverable Messages

By default, MSMQ stores some messages in memory for increased performance, and a message may be sent and received from a queue without ever having been written to disk. This is not normally a problem, and it is completely transparent to you as the developer, but if the MSMQ service were to unexpectedly terminate due to a hardware or software failure, or due to some external circumstance such as a power failure, messages stored only in memory will be lost forever. You can control this behavior, forcing MSMQ to store a message to disk, by specifying that a message should be recoverable. A message can be marked as recoverable by setting the Recoverable property of a Message object before sending, or if you are not using Message objects and are just sending objects directly (described in Accessing Message Queues as the simple method of sending messages) you can control this behavior by setting the DefaultPropertiesToSend property on the MessageQueue object. In the code snippet to follow, a Message object is created, marked as recoverable, and then sent to the local private queue named "Orders."

Microsoft® Visual Basic® .NET

Dim recoverableMessage As New Message()
recoverableMessage.Body = "Sample Recoverable Message"
recoverableMessage.Recoverable = True
Dim msgQ As New MessageQueue(".\private$\Orders")
msgQ.Send(recoverableMessage)

Visual C# .NET

Message recoverableMessage = new Message();
recoverableMessage.Body = "Sample Recoverable Message";
recoverableMessage.Recoverable = true;
MessageQueue msgQ = new MessageQueue(@".\$private\Orders");
msgQ.Send(recoverableMessage);

Note   See Example 1 in the complete BDAdotNetAsync2 sample code (see top of article for download).

The next code snippet shows how to configure the default properties information of the MessageQueue so that all messages sent without using an actual Message object will be marked as Recoverable.

Visual Basic .NET

Dim msgQ As New MessageQueue(".\private$\Orders")
msgQ.DefaultPropertiesToSend.Recoverable = True
msgQ.Send("This message will be marked as Recoverable")

Visual C# .NET

MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
msgQ.DefaultPropertiesToSend.Recoverable = true;
msgQ.Send("This message will be marked as Recoverable");
msgQ.Close();

Note   See Example 2 in the complete BDAdotNetAsync2 sample code (see top of article for download).

For more information on the recoverable flag and its effect on MSMQ's handling of messages, take a look at the white paper, Optimizing Message Queuing Performance.

Transactions

The concept of transactions is part of a variety of different computing technologies, but the general meaning is always the same: to execute a multiple-step process such that either all or none of the steps will complete. In reality, transactions are handled by rolling back any steps that have already occurred if the entire transaction is not completed successfully. Regardless of the technology used to implement transactions, they are described using several basic concepts:

  • The beginning and end of the transaction must be indicated, so that it is known which actions are contained within that transaction.
  • If the transaction completes successfully, it is committed, which causes all the actions to become permanent and the transaction to end.
  • If the transaction fails, for whatever reason, it is aborted and a rollback occurs, undoing any actions that have already been executed before the abort.

For more information on the concept of transactions, as they apply in a more general way to the world of computing, check out the Microsoft® SQL Server™ technical article, Microsoft SQL Server: An Overview of Transaction Processing Concepts and the MS DTC.

MSMQ supports transactions in two different ways: internally by allowing multiple messages to be sent or received as part of a transaction and externally by participating in Distributed Transaction Coordinator (DTC) transactions. The difference between the two types of transactions will be discussed later in this article, but to participate in either type of transaction, you must be using transactional message queues.

Creating Transactional Queues

When a message queue is created, using the management interfaces in Microsoft Windows NT®, Windows® 2000, Windows® XP or programmatically, it can be specified as "transactional."

ms978430.bdadotnetasync2_01(en-us,MSDN.10).gif

Figure 1. You can create a queue and make it support transactions

If you are creating a queue programmatically and you intend on using that queue in transactions, you must specify this when you call the MessageQueue.Create method. The code snippet to follow creates a new transactional queue by supplying an additional parameter to the Create method.

Visual Basic .NET

MessageQueue.Create(".\private$\Orders", True)

Visual C# .NET

MessageQueue.Create(@".\private$\Orders", True);

Note   See Example 3 in the complete BDAdotNetAsync2 sample code (see top of article for download).

You can determine if a queue supports transactions after it has been created by viewing the queue properties through the MSMQ management interfaces, or by checking the Transactional property of the MessageQueue object programmatically. The next code snippet illustrates how to create a new instance of a MessageQueue object and check if it supports transactions.

Visual Basic .NET

Dim msgQ As New MessageQueue(".\private$\Orders")
If msgQ.Transactional Then
    'do transactional work
Else
    'do non-transactional work
End If

Visual C# .NET

MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
if (msgQ.Transactional) 
{
   //do transactional work
}
else
{
   //do non-transactional work
}

Note   See Example 4 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Attempting to use transactions with a non-transactional queue will fail and result in an exception of type MessageQueueException. The reverse, sending or receiving messages from a transactional queue without using transactions, will also fail. If you are using a transactional queue, and you are just sending a single message, you must use a special overload of the MessageQueue.Send method to indicate that each individual send or receive should be wrapped in its own single transaction. This is accomplished by passing the value MessageQueueTransactionType.Single as the second parameter to the Send method.

Using Internal (MSMQ) Transactions

MSMQ provides built-in support for transactions, allowing you to execute multiple actions against one or more queues (sending or receiving messages) all wrapped in a transaction so that you can guarantee that either all or none of the actions will take effect. The transaction support provided within MSMQ is limited to MSMQ actions, however, and you will often be accessing other resources, such as a database, that you also wish to include in the same transaction. To allow for this, MSMQ also supports external transactions, where the Microsoft Distributed Transaction Coordinator (MS DTC), used by COM+ and SQL Server, is used to allow a variety of resources including databases and MSMQ to be included in a single transaction.

Sending and receiving within a transaction

To perform any MSMQ action(s) within an internal transaction, you need to create a MessageQueueTransaction object, begin the transaction, and then supply that object along with each of the actions you wish to include in the transaction.

Visual Basic .NET

Dim msgTx As New MessageQueueTransaction()
msgTx.Begin()

Visual C# .NET

MessageQueueTransaction msgTx = new MessageQueueTransaction();
msgTx.Begin();

Note   See Example 5 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Once you have this MessageQueueTransaction object, the Send and Receive methods of the MessageQueue object both have an overloaded version that can accept a transaction object along with the other required information. As noted earlier, before you can send any messages as part of this new transaction, you must explicitly mark the beginning of the transaction by using the MessageQueueTransaction.Begin method. More details on controlling the transaction are provided in the next section, but note that your actions (send and receive) won't permanently affect the queue unless you commit the transaction, so a call to the MessageQueueTransaction.Commit method is required after the send or else the receive may not find any messages to retrieve. The two pieces of code to follow show how to both send and receive messages within a transaction.

Visual Basic .NET

Dim msgTx As New MessageQueueTransaction()
Dim msgQ As New MessageQueue(".\private$\Orders")

msgTx.Begin()
msgQ.Send("This is a transactional message!", msgTx)
msgTx.Commit()

msgTx.Begin()
Dim msg As Message
msg = msgQ.Receive(msgTx)
msgTx.Commit()

Visual C# .NET

MessageQueueTransaction msgTx = new MessageQueueTransaction();
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");

msgTx.Begin();
msgQ.Send("This is a transactional message!",msgTx);
msgTx.Commit();

msgTx.Begin();
Message msg;
msg = msgQ.Receive(msgTx);
msgTx.Commit();

Note   See Example 6 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Controlling the transaction

The MessageQueueTransaction object itself supplies a variety of methods and properties that you can use to control the transaction and to examine its current status. In addition to the Begin method, which starts a transaction, the two methods that allow you to control how this transaction ends are Commit and Abort, which you can use to mark the transaction as having completed successfully or as having failed and needing to be rolled back. The simplest way to use the MessageQueueTransaction object to ensure that your transaction executes correctly is to wrap your MSMQ code in an error-handling block and to execute Abort if any error occurs and Commit if everything executes successfully. The following code gives an example of this method of transaction handling, sending two messages to a queue as part of transaction and wrapped in a try/catch/finally block. The transaction is aborted in the catch block, committed at the very end of the try block, and all of the objects are disposed of correctly in the finally block, ensuring that this housekeeping step will always occur.

Visual Basic .NET

Dim msgTx As New MessageQueueTransaction()
Dim msgQ As New MessageQueue(".\private$\Orders")
msgTx.Begin()
Try
    msgQ.Send("First Message", msgTx)
    msgQ.Send("Second Message", msgTx)
    msgTx.Commit()
Catch
    msgTx.Abort()
Finally
    msgQ.Close()
End Try

Visual C# .NET

MessageQueueTransaction msgTx = new MessageQueueTransaction();
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
msgTx.Begin();
try
{
   msgQ.Send("First Message",msgTx);
   msgQ.Send("Second Message",msgTx);
   msgTx.Commit();
}
catch
{
   msgTx.Abort();
}
finally
{
   msgQ.Close();
}

Note   See Example 7 in the complete BDAdotNetAsync2 sample code (see top of article for download).

In addition to the two control methods, the MessageQueueTransaction object also provides access to the current status of the transaction through the Status property, which will return one of four possible values:

  • Initialized. The transaction object has been created, but no actions have occurred yet, so the transaction itself has not been started.
  • Pending. Actions have occurred, but the transaction is still in progress—it has not committed or aborted.
  • Committed. The transaction has completed successfully.
  • Aborted. The transaction was aborted, and has been rolled back.

The Status property is an enumeration representing the four possible values. The code to follow shows how you could use it in your program.

Visual Basic .NET

If msgTx.Status = MessageQueueTransactionStatus.Aborted Then
    Console.WriteLine("Transaction Aborted!")
End If

Visual C# .NET

if (msgTx.Status = MessageQueueTransactionStatus.Aborted)
   Console.WriteLine("Transaction Aborted!");

Note   See Example 8 in the complete BDAdotNetAsync2 sample code (see top of article for download).

A real-world example

There are many different reasons to use transactions when you are working with MSMQ, but here is a specific example to assist you in understanding this technology. Consider a situation where orders are being sent into your back-end system via MSMQ. Your system retrieves messages off of a queue, processes the message, and then sends it on to another queue. At the same time you fire another message off into a log queue that you use to audit the movement of orders within your system. Without transactions, you could retrieve the order (removing it from the incoming queue) but never send it on to another queue due to some serious error. At this point, you have lost the order completely, and that is certainly not acceptable in most systems. A slightly less serious problem is that you could retrieve and process the order successfully but an error prevents you from sending an audit message to your log queue, leaving you with a log that incorrectly states that this order is still waiting to be processed. Either problem can be avoided by wrapping the entire activity, including the read from the incoming queue, the write to the outgoing queue, and the write to the audit queue in a transaction.

Participating in External (DTC) Transactions

MSMQ transactions are very useful, but they are limited to dealing with MSMQ activities only, while most systems that use MSMQ will also be using other resources, such as a database. Using only MSMQ transactions, you could end up with a database that is out of sync with the state of your message queues. Consider the real-world example just discussed, where an order is read off of a queue and then sent to a new queue, followed by a log entry being sent to another queue for auditing purposes. It is not unlikely that a database could be involved in a scenario like that, perhaps as the destination for the order or serving as the location to log audit information. If an error were to occur and MSMQ transactions were in use, the MSMQ activities would be rolled back, but the database work would not. Such a situation is little better than using no transaction processing at all, because the result could still be unreliable data in your system. The solution is to use a form of transaction processing that is capable of handling the database and MSMQ work as part of a single transaction, and that technology is called MS DTC, or Microsoft Distributed Transaction Coordinator. Using DTC allows you to initiate a transaction and then proceed to use resources from multiple sources, such as an SQL Server database and MSMQ, and then have those resources participate in that transaction, committing or rolling back as appropriate when the transaction ends.

Note   DTC only supports resources that have implemented compatible interfaces that manage the transaction for that specific resource. These interfaces are referred to as "Resource Managers" (see the .NET Framework Developer's Guide documentation on Distributed Transactions for more information on this topic) and are provided by several resources at this time, including MSMQ, Microsoft SQL Server, Oracle, Sybase, and others.

Creating a serviced component and setting transaction attributes

To use DTC transactions from your .NET system, you need to take advantage of the services provided by MTS/COM+ by running your code within their environment. Before .NET, this was accomplished by creating a COM component and registering that component as part of a COM+ application (known as a package in MTS) and setting the properties of that application so that COM+ would run it as part of a transaction. Once your component was placed into COM+, and set up to require a transaction, any access from that component to a DTC-compatible resource became part of the same transaction and could be committed or rolled back as a group. In .NET, you accomplish the same results by building a class that inherits from the System.EnterpriseServices.ServicedComponent class and then setting this class up with attributes indicating that it requires a transaction. Creating a class like this enables it to run inside COM+. The process to create, register, and use serviced components is covered in detail in the .NET Framework Developer's Guide documentation titled "Writing Serviced Components." The code to follow shows a sample class configured to run within COM+, with assembly attributes that set up the COM+ application properties and a class attribute <Transaction(TransactionOption.Required)> that indicates a transaction is required when this class is used.

Visual Basic .NET

Imports System.EnterpriseServices
<Assembly: ApplicationName("BDAdotNETAsync2")> 
<Assembly: ApplicationActivation(ActivationOption.Server)> 

<Transaction(TransactionOption.Required)> Public Class BDAdotNETAsync2
    Inherits ServicedComponent

    Public Sub Example9()
    End Sub
End Class

Visual C# .NET

using System.EnterpriseServices;

[assembly: ApplicationName("BDAdotNETAsync2_csharp")]
[assembly: ApplicationActivation(ActivationOption.Server)]

[Transaction(TransactionOption.Required)]
public class BDAdotNETAsync2_example9_csharp:ServicedComponent
{
   public static void Example9()
   {
   }
}

Note   See Example 9 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Using the System.EnterpriseServices.ContextUtil class, the code within your COM+ component can control the transaction in a very similar manner to the internal MSMQ code shown earlier in this article. ContextUtil provides a variety of methods, including SetAbort and SetComplete, which roll back and commit the transaction respectively.

Visual Basic .NET

Try
    insertLogEntry.ExecuteNonQuery()
    ContextUtil.SetComplete()
Catch e As Exception
    ContextUtil.SetAbort()
End Try

Visual C# .NET

try
{
   insertAuthor.ExecuteNonQuery();
   ContextUtil.SetComplete();
}
catch(exception e) 
{
   ContextUtil.SetAbort();
}

Note   See Example 9 in the complete BDAdotNetAsync2_example9 sample code (see top of article for download).

There are many more features of serviced components in general and the ContextUtil class specifically, but they are outside the scope of this article. Check out the link on "Writing Serviced Components" in the .NET Framework for more information on creating and using COM+ services with .NET.

Adding database and MSMQ operations

Within the COM+ component you would then execute whatever operations were needed against the resources that will be participating in your transaction. This might include sending and receiving MSMQ messages and/or accessing a database. There is no special coding required to indicate that you are participating in a transaction; just the fact that you are calling these resources from within a COM+ hosted component is sufficient to enlist them into the current transaction. You will want to add code to control your transaction, however, wrapping your activities in a try/catch/finally block and using the ContextUtil object to abort or commit the transaction as required.

Visual Basic .NET

'Create the queue
Dim msgQ As New MessageQueue(".\private$\Orders")
Dim incomingOrder As Order
'Configure the formatter with the expected type(s)
Dim targetTypes(0) As Type
targetTypes(0) = GetType(Order)
msgQ.Formatter = New XmlMessageFormatter(targetTypes)
'DB Connection setup code removed
Dim sqlConn As New SqlConnection(connectionString)
Dim insertLog As New SqlCommand(sqlString, sqlConn)
'Command setup code removed
Try
    'Receive new message, casting the result to the Order type
    incomingOrder = _
      CType(msgQ.Receive(New TimeSpan(0, 0, 30)).Body, Order)
    'Use ID from message to setup parameters for database insert
    orderIDParam.Value = incomingOrder.ID
    receiveParam.Value = DateTime.Now
    insertLog.Parameters.Add(orderIDParam)
    insertLog.Parameters.Add(receiveParam)
    sqlConn.Open()
    'Insert a new entry into the Log table
    insertLog.ExecuteNonQuery()
    'Mark the transaction as completing successfully
    ContextUtil.SetComplete()
Catch e As Exception
    'If any error occurs, abort the transaction
    ContextUtil.SetAbort()
Finally
    'Always close the connection, regardless of success/failure
    sqlConn.Close()
End Try

Visual C# .NET

//Create the queue
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
Order incomingOrder;
//Configure the formatter with the expected type(s)
Type[] targetTypes = new Type[1];
targetTypes[0] = typeof(Order);
msgQ.Formatter = new XmlMessageFormatter(targetTypes);
//DB Connection setup code removed
SqlConnection sqlConn = new SqlConnection(connectionString);
SqlCommand insertLog = new SqlCommand(sqlString), sqlConn);
//Command setup code removed
try
{
    //Receive new message, casting the result to the Order type
    incomingOrder = (Order)(msgQ.Receive(new TimeSpan(0, 0, 30)).Body);
    //Use ID from message to setup parameters for database insert
    orderIDParam.Value = incomingOrder.ID;
    receiveParam.Value = DateTime.Now;
    insertLog.Parameters.Add(orderIDParam);
    insertLog.Parameters.Add(receiveParam);
    sqlConn.Open();
    //Insert a new entry into the Log table
    insertLog.ExecuteNonQuery();
    //Mark the transaction as completing successfully
    ContextUtil.SetComplete();
}
catch
{
    //If any error occurs, abort the transaction
    ContextUtil.SetAbort();
}
finally
{
    //Always close the connection, regardless of success/failure
    sqlConn.Close();
}

Note   See Example 9 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Alternatively, instead of explicitly using the ContextUtil object to control your transaction, you can also use an attribute to tell COM+ that it should automatically commit or abort the transaction based on what happens inside your code. By placing the attribute <AutoComplete()> on your procedure, the transaction will be automatically committed—as is the case with ContextUtil.SetComplete()—if the procedure ends without raising any exceptions, and automatically aborted—as is the case with ContextUtil.SetAbort()—if an exception is raised and not handled within the procedure itself. To make this work correctly, you will need to either remove the error-handling block from your code or raise another exception from the catch block to ensure that the transaction is automatically aborted. I recommend the latter method, as it is better to explicitly catch exceptions in your code then to just remove all error handling.

<AutoComplete()> Public Sub Example9()
    '....
End Sub

In the end, when you have built your component and installed it into COM+, clients can create and call your component just like any other .NET class, and your component will automatically run inside COM+ and act as part of a transaction.

Note   The Microsoft .NET Framework Developer's Guide article on "Writing Serviced Components" should assist you in building a serviced component, but remember that your assembly must have a strong name and must be installed into the global assembly cache (using the gacutil command-line tool) to work correctly as a COM+ application.

Acknowledgements, Time Outs, and Journaling

Due to the asynchronous nature of using message queues, using either internal or external transactions in MSMQ can be somewhat confusing if you are used to using transactions only with databases. If you were to consider an order-processing example, where a message is being placed into a queue at the front end of your system, received by a process at the back end, processed and then saved into a database, it would be possible in the database world to wrap a transaction around this entire process. With MSMQ, this is not the case, because the send/receive processes are loosely coupled. You can create transactions that wrap each individual side of the process—creating one around the sending of the message to MSMQ and another around the retrieval of the message and the saving of that message into a database—but you cannot place a transaction around the entire system from beginning to end. This limitation is just another aspect of an asynchronous system and the result is that different methods must be used to determine if a message has made it all the way through the system correctly. Some of those methods include the use of acknowledgement messages, time outs, and journal queues. Each of these aspects of MSMQ is covered in the sections to follow.

Using Acknowledgements

When you perform an action synchronously, like calling a method on an object, you receive immediate feedback on the success of that action, but with asynchronous processes, a mechanism needs to be provided that will give you the same level of feedback. For this purpose, MSMQ provides acknowledgements, special MSMQ-generated messages that are sent to inform the sending application that a message was successfully delivered to and/or retrieved at the destination. In the examples included in this article, and in the examples from the earlier asynchronous article, messages are sent to a private queue that is located on the local machine, so successful delivery doesn't require an acknowledgement message. But this isn't always the case. In many situations, MSMQ is used in an offline (disconnected) mode, where messages are destined for some remote server but must be stored locally and then forwarded on to that remote server (or another server along the way) when a connection is available. In such a situation, the message has not been delivered when the call to MessageQueue.Send completes, so an acknowledgement of delivery becomes useful.

Requesting acknowledgement messages

Setting up acknowledgements requires the creation of a queue to receive these special messages, which can be any non-transactional queue, and setting certain properties on your messages before you send them. The code shown next creates the new queue and then sends a message after setting the two required properties—AdministrationQueue, which is used to indicate where the acknowledgement messages are sent, and AcknowledgeTypes, which indicates what particular events should cause a message to be sent.

Visual Basic .NET

If Not MessageQueue.Exists(".\private$\Ack") Then
    MessageQueue.Create(".\private$\Ack", False)
End If

Dim msgQ As New MessageQueue(".\private$\Orders")
Dim myMessage As New Message()
myMessage.Body = "Sample"
myMessage.AdministrationQueue = New MessageQueue(".\private$\Ack")
myMessage.AcknowledgeType = AcknowledgeTypes.FullReachQueue _
                            Or AcknowledgeTypes.FullReceive

msgQ.Send(myMessage, MessageQueueTransactionType.Single)

Visual C# .NET

MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
Message myMessage = new Message();
myMessage.Body = "Sample";
myMessage.AdministrationQueue = new MessageQueue(@".\private$\Ack");
myMessage.AcknowledgeType = AcknowledgeTypes.FullReachQueue
                            | 
AcknowledgeTypes.FullReceive;
msgQ.Send(myMessage, MessageQueueTransactionType.Single);

Note   See Example 10 in the complete BDAdotNetAsync2 sample code (see top of article for download).

The available settings for AcknowledgeTypes are:

  • FullReachQueue (success and failure to be delivered to the queue)
  • FullReceive (success and failure to be received)
  • NegativeReceive (failure to be received)
  • NotAcknowledgeReachQueue (failure to be delivered)
  • NotAcknowledgeReceive (failure to be received)
  • PositiveArrival (successful delivery)
  • PositiveReceive (successful receive)

This property is a mask bit, allowing you to combine multiple types of acknowledgment to produce the results you desire. You could, for instance, specify PositiveArrival Or PositiveReceive to indicate that you want all success acknowledgements. The preceding code snippet specified FullReachQueue Or FullReceive to indicate that all positive and negative results should be acknowledged. As with almost any Message property, you can also set these properties through the DefaultPropertiesToSend property of the MessageQueue object, allowing you to use the simple method of sending messages and still use the acknowledgement features.

Visual Basic .NET

Dim msgQ As New MessageQueue(".\private$\Orders")

With msgQ.DefaultPropertiesToSend
    .AdministrationQueue = New MessageQueue(".\private$\Ack")
    .AcknowledgeType = AcknowledgeTypes.FullReachQueue _
                       Or AcknowledgeTypes.FullReceive
End With
msgQ.Send("Sample", MessageQueueTransactionType.Single)

Visual C# .NET

if (!MessageQueue.Exists(@".\private$\Ack"))
   MessageQueue.Create(@".\private$\Ack",false);
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
msgQ.DefaultPropertiesToSend.AdministrationQueue =
      new MessageQueue(@".\private$\Ack");
msgQ.DefaultPropertiesToSend.AcknowledgeType =
      AcknowledgeTypes.FullReachQueue
      | AcknowledgeTypes.FullReceive;
msgQ.Send("Sample", MessageQueueTransactionType.Single);

Note   See Example 10b in the complete BDAdotNetAsync2 sample code (see top of article for download).

Processing acknowledgements

For acknowledgements to be of much use, your application has to do more than just request them—it also needs to receive them. Acknowledgement messages are a little different than regular MSMQ messages; they consist only of header information with no body, but receiving them works in the same manner as a regular message. Because there is no body, it is the other properties of the Message object that are important when you are receiving an acknowledgement message, but not every property is automatically received by the MessageQueue.Receive() method. There is another property of the MessageQueue object: MessageReadPropertyFilter, which is used to control exactly which message properties are received. If you look at the documentation in the .NET Framework Class Library for the MessagePropertyFilter class, you will find a list of which properties are retrieved by default, but the best bet is to explicitly tell the queue to receive the properties you need. The code snippet to follow does just that, informing the queue that the CorrelationID property should be received.

Dim adminQ As New MessageQueue(".\private$\Ack")
adminQ.MessageReadPropertyFilter.CorrelationId = True

The CorrelationID, a property that is empty on a regular message, is populated with the ID of the message to which an acknowledgement refers. The code snippet shown next sends a message, specifying that acknowledgements should be used, receives that message, and then executes a loop to receive all the available messages from the administration queue.

Visual Basic .NET

'Send Message
Dim msgQ As New MessageQueue(".\private$\Orders")
With msgQ.DefaultPropertiesToSend
    .AcknowledgeType = AcknowledgeTypes.FullReachQueue Or _
                        AcknowledgeTypes.FullReceive
    .AdministrationQueue = New MessageQueue(".\private$\Ack")
End With
msgQ.Send("Sample", MessageQueueTransactionType.Single)

'Receive Message
Dim msg As Message
msg = msgQ.Receive(New TimeSpan(0))
Console.WriteLine("Message Received {0}", msg.Id)

'Receive Acknowledgement Messages
Dim adminQ As New MessageQueue(".\private$\Ack")
adminQ.MessageReadPropertyFilter.CorrelationId = True
Do
    Try
        msg = adminQ.Receive(New TimeSpan(0, 0, 5))
        If msg.MessageType = MessageType.Acknowledgment Then
            Select Case msg.Acknowledgment
                Case Acknowledgment.ReachQueue
                    Console.WriteLine("Message Reached Queue {0}", _
                        msg.CorrelationId)
                Case Acknowledgment.Receive
                    Console.WriteLine("Message Received {0}", _
                        msg.CorrelationId)
            End Select
        End If
    Catch e As Exception
        Console.WriteLine(e.Message)
        msg = Nothing
    End Try
Loop Until msg Is Nothing

Visual C# .NET

//Send Message
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
msgQ.DefaultPropertiesToSend.AcknowledgeType =
      AcknowledgeTypes.FullReachQueue |
      AcknowledgeTypes.FullReceive;
msgQ.DefaultPropertiesToSend.AdministrationQueue =
      new MessageQueue(@".\private$\Ack");
msgQ.Send("Sample");

//Receive Message
Message msg;
msg = msgQ.Receive(new TimeSpan(0), MessageQueueTransactionType.Single);
Console.WriteLine("Message Received {0}", msg.Id);

//Receive Acknowledgement Messages
MessageQueue adminQ = new MessageQueue(@".\private$\Ack");
adminQ.MessageReadPropertyFilter.CorrelationId = True;
do
{
   try
   {
      msg = adminQ.Receive(new TimeSpan(0,0,5));
      if (msg.MessageType = MessageType.Acknowledgment)
         switch(msg.Acknowledgment) 
         {
            case Acknowledgment.ReachQueue:
               Console.WriteLine("Message Reached Queue 
{0}",
                  msg.CorrelationId);
               break;
            case Acknowledgment.Receive:
               Console.WriteLine("Message Received {0}",
                  msg.CorrelationId);
               break;
         }
   }
   catch(Exception e)
   {
      Console.WriteLine(e.Message);
      msg = null;
   }
}
while (!msg = null);

Note   See Example 11 in the complete BDAdotNetAsync2 sample code (see top of article for download).

In a disconnected scenario, the system sending messages could request acknowledgements and then use those messages to update status within its own database, or simply use the administration queue as an auditing mechanism.

Setting Time Outs

It is possible that an error could cause a message to not be received or delivered, but what if a message is delivered successfully and it is never retrieved? Using only acknowledgements is not enough to handle this situation, because no failure has occurred, and therefore no message will be sent back to your system. What you need is a way to tell MSMQ that your message must be delivered or received within a certain amount of time, or else it is to be considered a failed transmission. This functionality is provided in MSMQ through time outs, which allow you to independently specify an amount of time allowed for the delivery of the message and for the receipt of the message from the destination queue. Both of these time outs can be specified when you send a message, and both have the same effect: If the message is not delivered or received within the time specified, MSMQ treats it as an error, removes the message from the queue it is currently in, and sends any acknowledgements that were requested.

Visual Basic .NET

Dim msgQ As New MessageQueue(".\private$\Orders")
Dim myMessage As New Message()

With myMessage
    .Body = "This is a time sensitive message!"

    'Must Reach Queue within 10 minutes
    .TimeToReachQueue = New TimeSpan(0, 10, 0)

    'Must be received within 1 hour
    .TimeToBeReceived = New TimeSpan(1, 0, 0)

    'Request Acknowledgment, using .\private$\Ack
    .AcknowledgeType = AcknowledgeTypes.FullReachQueue Or _
                        AcknowledgeTypes.FullReceive
    .AdministrationQueue = New MessageQueue(".\private$\Ack")
End With

msgQ.Send(myMessage, MessageQueueTransactionType.Single)

Visual C# .NET

MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
Message myMessage = new Message();

myMessage.Body = "This is a time sensitive message!";

//Must Reach Queue within 10 minutes
myMessage.TimeToReachQueue = new TimeSpan(0, 10, 0);

//Must be received within 1 hour
myMessage.TimeToBeReceived = new TimeSpan(0, 0, 1);

//Request Acknowledgment, using .\private$\Ack
myMessage.AcknowledgeType = AcknowledgeTypes.FullReachQueue 
                     | AcknowledgeTypes.FullReceive;
myMessage.AdministrationQueue = new MessageQueue(@".\private$\Ack");

msgQ.Send(myMessage, MessageQueueTransactionType.Single);

Note   See Example 12 in the complete BDAdotNetAsync2 sample code (see top of article for download).

The preceding code example will successfully send the message, but if the message is not received off of the Orders queue within one hour (you may wish to change the time span to a shorter value for easier testing, such as one minute) it will be removed from the Orders queue and a new message will appear in the Administration queue (Ack in this example). If you choose to use these time-out features, you will need to have a process in place for dealing with the messages that have timed out, such as resending at a higher priority or sending the message to a person for direct intervention.

Using Journal Queues

Journal queues have a similar purpose to the acknowledgement mechanism that was discussed earlier: They exist as a way to track the movement of messages in an MSMQ system. Journal queues are system-provided queues that are automatically created, but are not automatically used. Whenever a message is sent or received, it is possible for a copy of that message to be stored into a journal queue as a form of auditing. Each application queue has its own associated journal queue, to hold copies of messages as they are received from the application queue, and each MSMQ machine has a single journal queue that can be used to keep copies of each message that is sent from that machine.

For the first case, keeping copies of messages received from a queue, the UseJournalQueue property of the MessageQueue object must be set to true before the receive actions occur.

Visual Basic .NET

Dim msgQ As New MessageQueue(".\private$\Orders")
Dim myMessage As Message

msgQ.UseJournalQueue = True
myMessage = msgQ.Receive(MessageQueueTransactionType.Single)

Visual C# .NET

//receiving messages using the journal feature
MessageQueue msgQ = new MessageQueue(@".\private$\Orders");
Message myMessage;

//send a little message so that
//we have something to receive
msgQ.Send("Sample", MessageQueueTransactionType.Single);

msgQ.UseJournalQueue = true;
myMessage = msgQ.Receive(MessageQueueTransactionType.Single);

Note   See Example 13 in the complete BDAdotNetAsync2 sample code (see top of article for download).

For sending messages, the UseJournalQueue property of the Message object controls this behavior—for those sends that use a Message object. You can also set the DefaultPropertiesToSend.UseJournalQueue property to ensure that all sends that do not use Message objects will place copies into the journal queue. Note that in the case of sending messages, journaling only occurs if you are sending to a queue located on another machine; for sends to local queues, journaling does not occur. For this example to work, you will need to use two machines, one of which must be capable of running public queues, meaning that it will need to be a Server OS (either Windows NT 4.0 or Windows 2000). From the client machine, you can execute the code shown next to send a message to a public queue on the server machine.

Visual Basic .NET

Dim msgQ As New MessageQueue("homemadesoap\Orders")
Dim myMessage As New Message()

myMessage.Body = String.Format("Sample Message {0}", DateTime.Now())""
myMessage.UseJournalQueue = True
msgQ.Send(myMessage, MessageQueueTransactionType.Single)

msgQ.DefaultPropertiesToSend.UseJournalQueue = True
msgQ.Send("Simple Send", MessageQueueTransactionType.Single)

Visual C# .NET

//sending messages using the journal feature
MessageQueue msgQ = new MessageQueue(@"homemadesoap\Orders");
Message myMessage = new Message();

myMessage.Body = string.Format("Sample Message {0}", DateTime.Now());
myMessage.UseJournalQueue = true;
msgQ.Send(myMessage, MessageQueueTransactionType.Single);

msgQ.DefaultPropertiesToSend.UseJournalQueue = True;
msgQ.Send("Simple Send", MessageQueueTransactionType.Single);

Note   See Example 14 in the complete BDAdotNetAsync2 sample code (see top of article for download).

MSMQ on its own will not remove items from journal queues, but you can write code to receive messages from these queues just like any other queue. One possible use for this feature would be creating a handler for negative acknowledgements. If your administration queue receives a message indicating that a message failed to be delivered or was not received before it timed out, journal queues provide you a way to find and resend that message. You can use the CorrelationID from the acknowledgement message to find the original in the journal queue (matching to the journal message's ID property) and then take that original message and send it again. This is an example of how these two features, acknowledgements and journaling, can be used together to make your messaging application more robust and reliable. The code to follow watches an administration queue (.\private$\ack in this example) for negative acknowledgements (delivery and receipt) and then looks in the local machine's journal queue for that message. Using the ReceiveByID method of the MessageQueue, this can be done without having to scan the entire journal queue. Finally, having found the original message, a new message is created, populated with the body from the original, and then resent.

Note   Once the journal message is received from the journal queue, it is removed from the queue, so the journal queue is not a permanent record of every message sent or received.

Visual Basic .NET

'Processing Negative Acknowledgement Messages
Dim adminQ As New MessageQueue(".\private$\Ack")
Dim journalQ As New MessageQueue(".\Journal$")
Dim msgQ As MessageQueue
Dim myMessage, resendMessage As Message
Dim originalMessage As Message

adminQ.MessageReadPropertyFilter.CorrelationId = True

'Retrieve all properties on journal message
journalQ.MessageReadPropertyFilter.SetAll()

'Infinite Loop, intended to keep running at all times
'to process messages. You could implement some mechanism
'to have it check a control queue for a "stop" message.
Do
    'This method will block until a message is received.
    'Click "Stop Debugging" from the Debug menu
    'to stop execution.
    myMessage = adminQ.Receive()

    Select Case myMessage.Acknowledgment
          'Failed in some way
        Case Acknowledgment.Purged, _
                Acknowledgment.QueueExceedMaximumSize, _
                Acknowledgment.QueuePurged, _
                Acknowledgment.ReachQueueTimeout, _
                Acknowledgment.ReceiveTimeout

            Try
                originalMessage = _
                     
journalQ.ReceiveById(myMessage.CorrelationId)
                msgQ = New MessageQueue("FormatName:" _
                        & originalMessage.DestinationQueue.FormatName())
                originalMessage.TimeToBeReceived = _
                     New TimeSpan(1, 0, 0, 0)
                Dim targetTypes(0) As Type
                targetTypes(0) = GetType(String)

                originalMessage.Formatter = _
                     New XmlMessageFormatter(targetTypes)
                resendMessage = New Message()
                With resendMessage
                    .Body = CStr(originalMessage.Body)
                    .UseJournalQueue = True
                    .TimeToBeReceived = New TimeSpan(1, 0, 0)
                    .AcknowledgeType = AcknowledgeTypes.FullReachQueue _
                           Or 
AcknowledgeTypes.FullReceive
                    .AdministrationQueue = _
                           New 
MessageQueue(".\private$\Ack")
                End With
                msgQ.Send(resendMessage, _ 
            MessageQueueTransactionType.Single)

            Catch e As Exception
                Console.WriteLine(e.Message)
                Pause()
            End Try

        Case Else
            'Other types of acknowledgment
            'don't handle in this code.
    End Select
Loop

Visual C# .NET

//Processing Negative Acknowledgement Messages
MessageQueue adminQ = new MessageQueue(@".\private$\Ack");
MessageQueue journalQ = new MessageQueue(@".\Journal$");
MessageQueue msgQ;

Message myMessage, resendMessage;
Message originalMessage;
adminQ.MessageReadPropertyFilter.CorrelationId = true;

//Retrieve all properties on journal message
journalQ.MessageReadPropertyFilter.SetAll();

/* Infinite Loop, intended to keep running at all times
   * to process messages. You could implement some mechanism
   * to have it check a control queue for a "stop" message.
   */
do
{
   /* this will block until a message is received.
      * Click "Stop Debugging" from the Debug menu
      * to stop execution. */
    myMessage = adminQ.Receive();

   switch(myMessage.Acknowledgment) 
   {
      case Acknowledgment.Purged:
      case Acknowledgment.QueueExceedMaximumSize:
      case Acknowledgment.QueuePurged:
      case Acknowledgment.ReceiveTimeout:
         //failed

         try
         {
            originalMessage =
                    journalQ.ReceiveById(myMessage.CorrelationId);
            msgQ = new MessageQueue("FormatName:"
               + originalMessage.DestinationQueue.FormatName());
            originalMessage.TimeToBeReceived = new 
TimeSpan(1, 0, 0, 0);
            Type[] targetTypes = {typeof(string)};
                
            originalMessage.Formatter =
                    new XmlMessageFormatter(targetTypes);
            resendMessage = new Message();
            resendMessage.Body = 
(string)originalMessage.Body;
            resendMessage.UseJournalQueue = true;
            resendMessage.TimeToBeReceived = new TimeSpan(1, 
0, 0);
            resendMessage.AcknowledgeType =
                    AcknowledgeTypes.FullReachQueue |
                    AcknowledgeTypes.FullReceive;
            resendMessage.AdministrationQueue =
                    new MessageQueue(@".\private$\Ack");
            msgQ.Send(resendMessage,
               MessageQueueTransactionType.Single);
         }
         catch(Exception e)
         {
            Console.WriteLine(e.Message);
            Pause();
         }
         break;

      default:
         //Other types of acknowledgment
         //Don't handle in this code
         break;
   }
}
while(true);

Note   See Example 15 in the complete BDAdotNetAsync2 sample code (see top of article for download).

For the preceding code to work, a message has to produce a negative acknowledgement, and been using journaling when it was originally sent. To facilitate your experimentation with these concepts, the code shown next will create that exact condition, sending a message (specifying acknowledgement and journaling) with a short time out that will therefore fail to be received. Run this code before the previous example if you wish to see the handling code in action.

Visual Basic .NET

Dim msgQ As New MessageQueue("homemadesoap\Orders")
Dim myMessage As New Message()
Dim targetTypes(0) As Type

targetTypes(0) = GetType(String)
Try
    With myMessage
        .Formatter = New XmlMessageFormatter(targetTypes)
        .Body = "This should fail to be received!"
        .UseJournalQueue = True
        .AcknowledgeType = AcknowledgeTypes.NegativeReceive
        .AdministrationQueue = New MessageQueue(".\private$\Ack")
        .TimeToBeReceived = New TimeSpan(0, 1, 0) ' 5 minutes
    End With

    msgQ.Send(myMessage, MessageQueueTransactionType.Single)
Catch e As Exception
    Console.WriteLine(e.ToString)
End Try

Visual C# .NET

MessageQueue msgQ = new MessageQueue(@"homemadesoap\Orders");
Message myMessage = new Message();
Type[] targetTypes = {typeof(string)};

try
{
   myMessage.Formatter = new XmlMessageFormatter(targetTypes);
   myMessage.Body = "This should fail to be received!";
   myMessage.UseJournalQueue = true;
   myMessage.AcknowledgeType = AcknowledgeTypes.NegativeReceive;
   myMessage.AdministrationQueue = new 
MessageQueue(@".\private$\Ack");
   myMessage.TimeToBeReceived = new TimeSpan(0,0,5); //5 minutes
   msgQ.Send(myMessage, MessageQueueTransactionType.Single);
}
catch(Exception e)
{
   Console.WriteLine(e.ToString);
}
Note   See Example 16 in the complete BDAdotNetAsync2 sample code (see top of article for download).

Conclusion

Message Queuing (MSMQ) support for transactions allows you to execute multiple actions against both MSMQ and other resources that are compatible with Microsoft DTC and ensure that all of these actions are treated as a single unit. Combining those transactional abilities with the various tracking features of MSMQ allows you all the flexibility you need to build a robust system for asynchronous messaging. For more information on MSMQ, check out the following references:

  • The MSMQ home page at http://www.microsoft.com/msmq/
  • For an overview of MSMQ functionality within the .NET Framework in the .NET Framework Developer's Guide article, see "Incorporating Messaging into Applications."
  • The prequel to this article, Accessing Message Queues

Keep watching for more technical notes and architectural articles on distributed application development with .NET.

Show:
© 2014 Microsoft