C-C++ Code Example: Reading Messages Asynchronously Using a Callback Function

 

Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista

The following example includes two application-defined functions that can be used for asynchronously reading the messages arriving in a specified queue. The first function is the callback function invoked when a message is found or arrives at the queue within the time-out period specified or when the time-out period elapses. The second function receives the format name of the queue to be monitored and the length of the time interval for monitoring the queue from a caller in the application, and contains a call to MQReceiveMessage that specifies a callback function.

For more information about reading messages asynchronously, see Asynchronous Reading.

An application using these functions must include the Windows.h, Stdio.h, and Mq.h header files.

This example uses two events to signal whether the callback function was invoked because a message was received or because the time-out period elapsed during a pending I/O operation. The handles to these events must be declared globally. The following code declares these handles.

HANDLE g_hTimeOutEvent = NULL;  
HANDLE g_hRecEvent = NULL;  

To process a message or a time-out event in the callback function

  1. If the HRESULT passed to the callback function indicates that a message was successfully received, retrieve and display the application-specific data transmitted in the PROPID_M_APPSPECIFIC property of the message and signal the event indicating that a message was found and received.

Note

The only parameters of the callback function that are used in it are the HRESULT value and the pointer to the MQMSGPROPS structure.

  1. If the HRESULT passed to the callback function indicates that the time-out interval elapsed during the I/O operation, signal the event indicating that a time-out occurred.

  2. Free the memory allocated for the MQMSGPROPS structure and the arrays associated with it.

    VOID CALLBACK fnReceiveCallback(  
                                    HRESULT hr,  
                                    QUEUEHANDLE hQueue,  
                                    DWORD dwTimeOut,  
                                    DWORD dwAction,  
                                    MQMSGPROPS * pmsgprops,  
                                    LPOVERLAPPED pov,  
                                    HANDLE hCursor  
                                    )  
    {  
      if (SUCCEEDED(hr))  
      {  
        // Retrieve the application-specific data in the message.  
        long lAppSpecific = 0;  
        lAppSpecific = pmsgprops->aPropVar[0].lVal;  
        wprintf(L"Message received. Application-specific data: %ld\n",  
                lAppSpecific  
                );  
        SetEvent(g_hRecEvent);  
      }  
      else if (hr == MQ_ERROR_IO_TIMEOUT)  
      {  
        SetEvent(g_hTimeOutEvent);  
      }  
      else  
      {  
        wprintf(L"The I/O operation failed. Error: 0x%X\n", hr);  
      }  
    
      // Free the memory allocated for the message properties structure.  
      if (pmsgprops)  
      {  
        delete pmsgprops->aStatus;  
        delete pmsgprops->aPropVar;  
        delete pmsgprops->aPropID;  
        delete pmsgprops;  
      }  
    }  
    

To receive messages asynchronously using a callback function

  1. Validate the input string provided by the caller in the application. This string will be used as the format name to open the queue to be monitored and obtain a queue handle.

  2. Create two event objects in the nonsignaled state and specify the handles to them in an array of handles to event objects.

  3. Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.

  4. In a loop, create and initialize an MQMSGPROPS structure on the heap so that the callback function can release the memory allocated for it. In this example, the message properties structure is initialized for retrieving only PROPID_M_APPSPECIFIC, but your application can retrieve any number of message properties. Then call ResetEvent to set each of the event objects to the nonsignaled state and receive a message by calling MQReceiveMessage with a callback function specified.

  5. If the call to MQReceiveMessage immediately returns MQ_OK (a message was found in the queue) or MQ_INFORMATION_OPERATION_PENDING (no message was found in the queue), call WaitForMultipleObjects to wait for an event indicating that a message was received or the time-out interval supplied by the caller elapsed. At this point the function is free to perform other tasks while the callback function performs its tasks.

Code Example

The following code example can be run on all versions of Message Queuing.

HRESULT AsyncRead(  
                  WCHAR * wszFormatName,  
                  DWORD dwTimeOut  
                  )  
{  
  HANDLE hEvents[2] = {NULL, NULL};  
  QUEUEHANDLE hQueue = NULL;  
  DWORD dwStatus = 0;  
  const int NUMBEROFPROPERTIES = 1;  
  MQMSGPROPS * pmsgprops = NULL;  
  DWORD cPropId = 0;  
  HRESULT hr = MQ_OK;  
  
  // Validate the format name input parameter and open the queue for reading.  
  if (wszFormatName == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  
  // Create two event objects in the nonsignaled state.  
  g_hRecEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  
  if (g_hRecEvent == NULL)  
  {  
    return -1;  
  }  
  g_hTimeOutEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  
  if (g_hTimeOutEvent == NULL)  
  {  
    return -1;  
  }  
  hEvents[0] = g_hRecEvent;  
  hEvents[1] = g_hTimeOutEvent;  
  
  // Open the queue for reading messages.  
  hr = MQOpenQueue(  
                   wszFormatName,  
                   MQ_RECEIVE_ACCESS,  
                   MQ_DENY_NONE,  
                   &hQueue  
                   );  
  if (FAILED(hr))  
  {  
    return hr;  
  }  
  
  // Continue waiting for messages until the time-out period elapses.  
  while (dwStatus != WAIT_OBJECT_0+1)  
  {  
    hr = MQ_OK;  
  
    // Define an MQMSGPROPS structure on the heap for receiving a single message.  
    pmsgprops = new MQMSGPROPS;  
    if (pmsgprops == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    memset(pmsgprops, 0, sizeof(MQMSGPROPS));  
  
    pmsgprops->aPropID = new MSGPROPID[NUMBEROFPROPERTIES];      // Message property identifiers  
    if (pmsgprops->aPropID == NULL)  
    {  
        return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    pmsgprops->aPropVar = new MQPROPVARIANT[NUMBEROFPROPERTIES]; // Message property values  
    if (pmsgprops->aPropVar == NULL)  
    {  
        return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    pmsgprops->aStatus = new HRESULT[NUMBEROFPROPERTIES];        // Error reports  
    if (pmsgprops->aStatus == NULL)  
    {  
        return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    cPropId = 0;  
    pmsgprops->aPropID[cPropId] = PROPID_M_APPSPECIFIC; // Property identifier  
    pmsgprops->aPropVar[cPropId].vt = VT_UI4;           // Type indicator  
    cPropId++;  
    pmsgprops->cProp = cPropId;                         // Number of message properties  
  
  // Set the two event objects to the nonsignaled state and receive a message.  
    ResetEvent(hEvents[0]);  
    ResetEvent(hEvents[1]);  
    hr = MQReceiveMessage(  
                          hQueue,             // Queue handle  
                          dwTimeOut,          // Time to wait for a message  
                          MQ_ACTION_RECEIVE,  // Receive access  
                          &msgprops,          // MQMSGPROPS structure  
                          NULL,               // No OVERLAPPED structure   
                          fnReceiveCallback,  // Callback function  
                          NULL,               // No cursor  
                          MQ_NO_TRANSACTION   // Not in a transaction  
                          );  
  
    if (hr == MQ_OK || hr == MQ_INFORMATION_OPERATION_PENDING)  
    {  
      dwStatus = WaitForMultipleObjects(  
                                2,            // Two events  
                                hEvents,      // Handle to array of event objects  
                                FALSE,        // Wait until either event is signaled.  
                                INFINITE      // Time-out period  
                                );  
  
      switch (dwStatus)  
      {  
        case WAIT_OBJECT_0:  
          wprintf(L"A receive event was signaled.\n");  
          break;  
  
        case WAIT_OBJECT_0+1:  
          wprintf(L"A time-out event was signaled.\n");  
          hr = MQ_ERROR_IO_TIMEOUT;  
          break;  
  
        default:  
          break;  
      }  
    }  
  
    if (FAILED(hr))  
    {  
      MQCloseQueue(hQueue);  
      return hr;  
    }  
  }  
  
  // Close the queue.  
  hr = MQCloseQueue(hQueue);  
  return hr;  
}