Was this page helpful?
Your feedback about this content is important. Let us know what you think.
Additional feedback?
1500 characters remaining
Export (0) Print
Expand All

Receiving Polling-based Data-changed Messages by Using the WCF Channel Model

You can configure the Microsoft BizTalk Adapter for Oracle Database to poll an Oracle database table or view for any data changes. To perform such a polling operation, the adapter periodically executes a SQL query against an Oracle table or view followed by an optional PL/SQL code block. The results of the SQL query are then returned by the Oracle Database adapter to your code as a strongly-typed result set in an inbound POLLINGSTMT operation. For more information about the mechanism used to configure and perform polling on an Oracle database using the Oracle Database adapter, see Receiving Polling-based Data-changed Messages. It is strongly recommended that you read this topic before proceeding.

You configure the Oracle Database adapter to poll and Oracle database table or view by setting binding properties on an instance of OracleDBBinding. In the WCF channel model, you then use this binding to build a channel listener from which you can get an IInputChannel channel to receive the POLLINGSTMT operation from the adapter.

For an overview of how to receive operations using an IInputChannel in WCF, see "Service Channel-Level Programming" at http://go.microsoft.com/fwlink/?LinkId=106053.

The sections in this topic provide information to help you perform polling on Oracle database tables and views using the WCF channel model.

Consuming the POLLINGSTMT request message

The adapter invokes the POLLINGSTMT operation on your code to poll the Oracle database. That is, the adapter sends a POLLINGSTMT request message that you receive over an IInputChannel channel shape. The POLLINGSTMT request message contains the result set of the query specified by the PollingStatement binding property. You can consume the POLLINGSTMT message in one of two ways:

  • To consume the message using node-value streaming you must call the WriteBodyContents method on the response message and pass it an XmlDictionaryWriter that implements node-value streaming.

  • To consume the message using node streaming you can call GetReaderAtBodyContents on the response message to get an XmlReader.

You typically use node-value streaming to consume result sets that contain Oracle LOB data columns.

For more information about the message structure of the POLLINGSTMT operation, see Message Schemas for the Polling Operations.

For more information about how the Oracle Database adapter supports streaming on LOB data, see Streaming and the Oracle Database Adapter.

For more information about implementing node-value streaming in your code to support end-to-end streaming of LOB data, see Streaming Oracle LOB Data Types by Using the WCF Channel Model.

About the Examples Used in this Topic

The example in this topic uses the SCOTT.ACCOUNTACTIVITY table and the SCOTT.ACCOUNT_PKG.PROCESS_ACTIVITY function. A script to generate these artifacts is supplied with the samples. The example performs the following operations:

  • As part of the polling statement, selects all the records from the ACCOUNTACTIVITY table and displays on the console.

  • As part of the post poll statement, the example invokes the PROCESS_ACTIVITY function that moves all the records from ACCOUNTACTIVITY table to ACTIVITYHISTORY table.

  • Subsequent polls on the ACCOUNTACTIVITY table do not return any records. However, if you want the example to return more records as part of the polling operation, you must insert some records in the ACCOUNTACTIVITY table. You can do so by running the more_activity_data.sql script provided with the samples.

For more information about the samples, see Samples.

How Do I Poll an Oracle Database Using an IInputChannel?

To poll an Oracle database table or view to receive data-change messages using the WCF channel model, perform the following steps.

To receive data-changed messages using an IInputChannel

  1. Create a Visual C# project in Visual Studio. For this topic, create a console application.

  2. In the Solution Explorer, add reference to Microsoft.Adapters.OracleDB, Microsoft.ServiceModel.Channels, System.ServiceModel, and System.Runtime.Serialization.

  3. Open the Program.cs file and add the following namespaces:

    • Microsoft.Adapters.OracleDB

    • Microsoft.ServiceModel.Channels

    • System.ServiceModel

    • System.ServiceModel.Description

    • System.ServiceModel.Channels

    • System.Xml

    • System.Runtime.Serialization

    • System.IO

    • Microsoft.ServiceModel.Channels.Common

  4. Create an instance of OracleDBBinding and set the binding properties required to configure polling. At a minimum you must set the InboundOperationType, PollingStatement, and PollingInterval binding properties. For this example, you also set the PostPollStatement binding property. For more information about binding properties used to configure polling, see Receiving Polling-based Data-changed Messages.

    OracleDBBinding binding = new OracleDBBinding();
    binding.InboundOperationType = InboundOperation.Polling;
    binding.PollingInterval = 30;
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
    
  5. Create a binding parameter collection and set the credentials.

    ClientCredentials credentials = new ClientCredentials();
    credentials.UserName.UserName = "SCOTT";
    credentials.UserName.Password = "TIGER";
    
    BindingParameterCollection bindingParams = new BindingParameterCollection();
    bindingParams.Add(credentials);
    
  6. Create a channel listener and open it. You create the listener by invoking BuildChannelListener<IInputChannel> method on the OracleDBBinding. You can modify the target namespace for the POLLINGSTMT operation by setting the PollingId property in the connection URI. For more information about the adapter connection URI, see The Oracle Database Connection URI.

    IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
    listener.Open();
    
  7. Get an IInputChannel channel by invoking the AcceptChannel method on the listener and open it.

    IInputChannel channel = listener.AcceptChannel();
    channel.Open();
    
  8. Invoke Receive on the channel to get the next POLLINGSTMT message from the adapter.

    Message message = channel.Receive();
    
  9. Consume the result set returned by the POLLINGSTMT operation. You can consume the message using either an XmlReader or an XmlDictionaryWriter.

    XmlReader reader = message.GetReaderAtBodyContents();
    
  10. Close the channel when you have completed processing the request.

    channel.Close()
    
    ImportantImportant
    You must close the channel after you have finished processing the POLLINGSTMT operation. Failure to close the channel may affect the behavior of your code.

  11. Close the listener when you are finished receiving data-changed messages.

    listener.Close()
    
    ImportantImportant
    Closing the listener does not close channels created using the listener. You must explicitly close each channel created using the listener.

Example

The following example shows how to configure the Oracle Database adapter to poll Oracle database tables and views and receive the POLLLINGSTMT operation using the WCF channel model. The result set returned in the POLLINGSTMT operation is written to the console by using an XmlReader.

using System;
using System.Collections.Generic;
using System.Text;

// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;

// Add this namespace for channel model
using System.ServiceModel.Channels;

using System.Xml;
using System.Runtime.Serialization;
using System.IO;

// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;

namespace OraclePollingCM
{
    class Program
    {
        static void Main(string[] args)
        {
            Uri connectionUri = new Uri("oracleDB://ADAPTER/");

            IChannelListener<IInputChannel> listener = null;
            IInputChannel channel = null;

            // set timeout to receive POLLINGSTMT message
            TimeSpan messageTimeout = new TimeSpan(0, 0, 30);

            Console.WriteLine("Sample Started");

            try
            {
                // Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the         
                // PollingStatement,and the PostPollStatement.
                OracleDBBinding binding = new OracleDBBinding();
                binding.InboundOperationType = InboundOperation.Polling;
                binding.PollingInterval = 30;
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
                
                // Create a binding parameter collection and set the credentials
                ClientCredentials credentials = new ClientCredentials();
                credentials.UserName.UserName = "SCOTT";
                credentials.UserName.Password = "TIGER";

                BindingParameterCollection bindingParams = new BindingParameterCollection();
                bindingParams.Add(credentials);


                Console.WriteLine("Opening listener");
                // get a listener  from the binding
                listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
                listener.Open();

                Console.WriteLine("Opening channel");
                // get a channel from the listener
                channel = listener.AcceptChannel();
                channel.Open();

                Console.WriteLine("Channel opened -- waiting for polled data");
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);

                // Poll five times with the specified message timeout 
                // If a timeout occurs polling will be aborted
                for (int i = 0; i < 5; i++)
                {
                    Console.WriteLine("Polling: " + i);
                    Message message = null;
                    XmlReader reader = null;
                    try
                    {
                        //Message is received so process the results
                        message = channel.Receive(messageTimeout);
                    }
                    catch (System.TimeoutException toEx)
                    {
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
                        continue;
                    }

                    // Get the query results using an XML reader
                    try
                    {
                        reader = message.GetReaderAtBodyContents();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Exception :" + ex);
                        throw;
                    }

                    // Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
                    Console.WriteLine("\nPolling data received for request number {0}", i+1);
                    Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");

                    while (reader.Read())
                    {
                        if (reader.IsStartElement())
                        {
                            switch (reader.Name)
                            {
                                case "POLLINGSTMTRECORD":
                                    Console.Write("\n");
                                    break;

                                case "TID":
                                    reader.Read();
                                    Console.Write(reader.ReadString() + "\t");
                                    break;

                                case "ACCOUNT":
                                    reader.Read();
                                    Console.Write(reader.ReadString() + "\t");
                                    break;
                                case "AMOUNT":
                                    reader.Read();
                                    Console.Write(reader.ReadString() + "\t");
                                    break;

                                case "TRANSDATE":
                                    reader.Read();
                                    Console.Write(reader.ReadString() + "\t");
                                    break;

                                default:
                                    break;
                            }
                        }
                    }

                    // return the cursor
                    Console.WriteLine();

                    // close the reader
                    reader.Close();

                    //            To save the polling data to a file you can REPLACE the code above with the following
                    //
                    //            XmlDocument doc = new XmlDocument();
                    //            doc.Load(reader);
                    //            using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
                    //            {
                    //                doc.WriteTo(writer);
                    //            }
                    message.Close();
                }
                
                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
                Console.ReadLine();
            }
            catch (TargetSystemException tex)
            {
                Console.WriteLine("Exception occurred on the Oracle Database");
                Console.WriteLine(tex.InnerException.Message);
            }
            catch (ConnectionException cex)
            {
                Console.WriteLine("Exception occurred connecting to the Oracle Database");
                Console.WriteLine(cex.InnerException.Message);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception is: " + ex.Message);
                if (ex.InnerException != null)
                {
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
                }
            }
            finally
            {
                // IMPORTANT: close the channel and listener to stop polling
                if (channel != null)
                {
                    if (channel.State == CommunicationState.Opened)
                        channel.Close();
                    else
                        channel.Abort();
                }

                if (listener != null)
                {
                    if (listener.State == CommunicationState.Opened)
                        listener.Close();
                    else
                        listener.Abort();
                }
            }
        }
    }
}

See Also

© 2014 Microsoft Corporation. All rights reserved.
Show:
© 2015 Microsoft