Export (0) Print
Expand All

C/C++ Code Example: Reading Messages Synchronously

The following example provides an application-defined function that synchronously reads each message in a known queue, removing each message as it is read.

For information about reading messages synchronously, see Synchronous Reading.

To read messages synchronously

  1. Define the maximum number of queue properties to be specified and the queue property counter.

  2. Define an MQMSGPROPS structure.

  3. Specify the message properties to retrieve. This example retrieves PROPID_M_LABEL, PROPID_M_LABEL_LEN, PROPID_M_BODY, PROPID_M_BODY_SIZE, and PROPID_M_BODY_TYPE.

ms706009.note(en-us,VS.85).gifNote
The maximum label length, including the string-terminating character, is MQ_MAX_MSG_LABEL_LEN (250 Unicode characters). This value must be reset for each message.

  1. Initialize the MQMSGPROPS structure.

  2. Create the format name of the queue. This format name is used to open the queue. Note that the input strings used to create the format name are validated in this function, but it is the responsibility of the caller to ensure that these strings are null-terminated.

  3. Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.

  4. In a loop, call MQReceiveMessage to retrieve the messages in the queue. If the body buffer is too small for any of the messages, reallocate memory for it.

  5. Call MQCloseQueue and free the resources.

Code Example

The following code example can be run on all versions of Message Queuing.

HRESULT ReadingDestQueue(
                         WCHAR * wszQueueName,
                         WCHAR * wszComputerName
                         )
{

  // Define the required constants and variables.
  const int NUMBEROFPROPERTIES = 5;
  DWORD cPropId = 0;
  HRESULT hr = MQ_OK;                                 // Return code
  HANDLE hQueue = NULL;                               // Queue handle
  ULONG ulBufferSize = 2;


  // Define an MQMSGPROPS structure.
  MQMSGPROPS msgprops;
  MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];
  MQPROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];
  HRESULT aMsgStatus[NUMBEROFPROPERTIES];


  // Specify the message properties to be retrieved.
  aMsgPropId[cPropId] = PROPID_M_LABEL_LEN;           // Property ID
  aMsgPropVar[cPropId].vt = VT_UI4;                   // Type indicator
  aMsgPropVar[cPropId].ulVal = MQ_MAX_MSG_LABEL_LEN;  // Length of label
  cPropId++;
  
  WCHAR wszLabelBuffer[MQ_MAX_MSG_LABEL_LEN];         // Label buffer
  aMsgPropId[cPropId] = PROPID_M_LABEL;               // Property ID
  aMsgPropVar[cPropId].vt = VT_LPWSTR;                // Type indicator
  aMsgPropVar[cPropId].pwszVal = wszLabelBuffer;      // Label buffer
  cPropId++;

  UCHAR * pucBodyBuffer = NULL;
  pucBodyBuffer = (UCHAR*)malloc(ulBufferSize); 
  if (pucBodyBuffer == NULL)
  {
    return MQ_ERROR_INSUFFICIENT_RESOURCES;
  }
  memset(pucBodyBuffer, 0, ulBufferSize);
  aMsgPropId[cPropId] = PROPID_M_BODY_SIZE;           // Property ID
  aMsgPropVar[cPropId].vt = VT_NULL;                  // Type indicator
  cPropId++;

  aMsgPropId[cPropId] = PROPID_M_BODY;                // Property ID
  aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1;       // Type indicator
  aMsgPropVar[cPropId].caub.pElems = (UCHAR*)pucBodyBuffer;  // Body buffer
  aMsgPropVar[cPropId].caub.cElems = ulBufferSize;    // Buffer size
  cPropId++;

  aMsgPropId[cPropId] = PROPID_M_BODY_TYPE;           // Property ID
  aMsgPropVar[cPropId].vt = VT_NULL;                  // Type indicator
  cPropId++;
  
  
  // Initialize the MQMSGPROPS structure.
  msgprops.cProp = cPropId;                           // Number of message properties
  msgprops.aPropID = aMsgPropId;                      // IDs of the message properties
  msgprops.aPropVar = aMsgPropVar;                    // Values of the message properties
  msgprops.aStatus = aMsgStatus;                      // Error reports
  
  
  // Validate the input strings.
  if (wszQueueName == NULL || wszComputerName == NULL)
  {
    return MQ_ERROR_INVALID_PARAMETER;
  }
  
  
  // Create a direct format name.
  WCHAR * wszFormatName = NULL;
  DWORD dwFormatNameLength = 0;
  dwFormatNameLength = wcslen(wszQueueName) + wcslen(wszComputerName) + 12;
  wszFormatName = new WCHAR[dwFormatNameLength];
  if (wszFormatName == NULL)
  {
    return MQ_ERROR_INSUFFICIENT_RESOURCES;
  }
  memset(wszFormatName, 0, dwFormatNameLength);
  // ************************************
  // You must concatenate "DIRECT=OS:", wszComputerName, "\", 
  // and wszQueueName into the wszFormatName buffer.
  // wszFormatName = "DIRECT=OS:" + wszComputerName + "\" +   
  // wszQueueName
  // If the format name is too long for the buffer, return FALSE.
  // ************************************

  // Open the queue with receive access.
  hr = MQOpenQueue(
                   wszFormatName,                      // Format name of the queue
                   MQ_RECEIVE_ACCESS,                  // Access mode
                   MQ_DENY_NONE,                       // Share mode
                   &hQueue                             // OUT: Queue handle
                   );
  // Free the memory that was allocated for the format name string.
  if (wszFormatName)
  {
    delete [] wszFormatName;
  }


  // Handle any error returned by MQOpenQueue.
  if (FAILED(hr))
  {
    return hr;
  }
  
  for ( ; ; )
  {
    aMsgPropVar[0].ulVal = MQ_MAX_MSG_LABEL_LEN;
    hr = MQReceiveMessage(
                          hQueue,                     // Queue handle
                          1000,                       // Max time to (msec) to receive the message
                          MQ_ACTION_RECEIVE,          // Receive action
                          &msgprops,                  // Message property structure
                          NULL,                       // No OVERLAPPED structure
                          NULL,                       // No callback function
                          NULL,                       // No cursor handle
                          MQ_NO_TRANSACTION           // Not in a transaction
                          );

    if (hr == MQ_ERROR_BUFFER_OVERFLOW)
    {
      ulBufferSize = aMsgPropVar[2].ulVal*sizeof(UCHAR);
      pucBodyBuffer = (UCHAR*)realloc(pucBodyBuffer, ulBufferSize);
      if (pucBodyBuffer == NULL)
      {
        return MQ_ERROR_INSUFFICIENT_RESOURCES;
      }
      memset(pucBodyBuffer, 0, ulBufferSize);
      aMsgPropVar[3].caub.pElems = (UCHAR*)pucBodyBuffer;
      aMsgPropVar[3].caub.cElems = ulBufferSize;
      continue;
    }

    if (FAILED(hr))
    {
      wprintf(L"No messages. Closing queue\n");
      break;
    }

    // If the message contains a label, print it.
    if (msgprops.aPropVar[0].ulVal == 0)
    {
      wprintf(L"Removed message from queue.\n");
    }
    else
    {
      wprintf(L"Removed message '%s' from queue.\n", wszLabelBuffer);
    }


    // If the message body is a string, display it.
    if (msgprops.aPropVar[4].ulVal == VT_BSTR)
    {
      wprintf(L"Body: %s", (WCHAR*)pucBodyBuffer);
      wprintf(L"\n");
    }
  }
  
  
  // Close the queue and free the memory allocated for the body buffer.
  hr = MQCloseQueue(hQueue);
  free(pucBodyBuffer);
  
  return hr;
}
Show:
© 2015 Microsoft