Using Service Bus for Windows Server from Java with AMQP 1.0

 

Updated: January 27, 2014

Applies To: Service Bus for Windows Server 1.1

Service Bus for Windows Server includes support for the Advanced Message Queuing Protocol (AMQP) version 1.0. AMQP is an efficient, reliable, wire-level messaging protocol that can be used to build robust, cross-platform messaging applications. AMQP enables you to leverage the queuing and publish/subscribe brokered messaging features of Service Bus from a range of platforms, using an efficient binary protocol. Furthermore, you can build applications that are comprised of components built using a mix of languages, frameworks, and operating systems.

Service Bus for Windows Server supports interoperability with AMQP. This section contains detailed information about how to send a message from a Java client that uses the JMS messaging interface, to a receiving .NET client application.

Using Service Bus from Java with AMQP 1.0 describes messaging interoperability between .NET and the Java Message Service (JMS). This information also applies to Service Bus for Windows Server, with some notable differences.

  • Java does not use the Windows certificate store; it has its own. You must export the auto-generated Service Bus SSL certificate to the Java messaging store. See the next section for detailed steps.

  • A JMS connection string for Service Bus for Windows Server takes the form amqps://username:encoded-password@FQDN/namespace. For example, amqps://mylabs:...@csnb1761007-gu.myDomain.com/AFServer9. The username and password are required in the connection string, otherwise you cannot create a MessageProducer. However, the namespace name is not required.

  • When addressing a queue in Service Bus for Windows Server, while creating a MessageProducer or MessageConsumer object, the path must include the namespace name; for example, namespace/queuename. This is different from Microsoft Azure Service Bus, and is required because the namespace name is optional in the connection string.

  • At this time, only Windows authentication is supported.

To export the SSL certificate to JMS, do the following:

  • Follow step 1 in Exporting Certificates to Client Machines, using the Get-SBAutoGeneratedCA PowerShell cmdlet. For the purposes of this example, assume the certificate file is exported to %temp%\AutoGeneratedCA.cer.

  • Go to %programfiles%\Java\jre7. Verify that the bin\keytool.exe tool exists, and that lib\security\cacerts exists. Note that you must be running as Administrator in order to perform a certificate import with Keytool.exe. Otherwise, an Access Denied error is generated.

  • Issue the following command:

    bin\keytool.exe –list –keystore lib\security\cacerts
    
  • Import the autogenerated Service Bus certificate by running the following command:

    bin\keytool.exe –importcert –alias AppServerGeneratedSBCA –file %temp%\AutoGeneratedCA.cer –keystore lib\security\cacerts –v
    

    You are prompted for the password (the default is “changeit”). If you do not know the password, then you will be unable to perform the import. When the tool asks you whether to trust the certificate, enter Y (Yes).

The following Java example creates a JMS queue object (that will later map to an existing Service Bus messaging entity), and sends a message to it. This example also contains code that creates a receiver to consume the message, but you can write the message receiver code in a .NET client.

import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;

import javax.jms.*;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URLEncoder;
import java.util.Hashtable;

public class Main {

    public static void main(String[] args) {
        String type = args[0];

        String queueName = "";

        String connectionString = "";

        if (type.compareTo("onprem") == 0)
        {
            String fqdn = args[1];
            String username = args[3];
            String password = args[4];
            queueName = args[2] + "/" + args[5];

            connectionString = "amqps://" + username  + ":" + encode(password) + "@" + fqdn;
        }
        else if (type.compareTo("cloud") == 0)
        {
            String domain = "servicebus.windows.net";
            String namespace = args[1];
            String username = args[2];
            String key = args[3];
            queueName = args[4];

            connectionString = "amqps://" + username  + ":" + encode(key) + "@" + namespace + "." + domain + "";
        }
        else
        {
            System.out.printf("Unknown type %s\n", type);
            System.out.print("SimpleJMS onprem fqdn namespace username pw queuename\n");
            System.out.print("SimpleJMS cloud namespace username key queuename\n");
            return;
        }

        System.out.print("Initial setup\n");
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
        env.put(Context.PROVIDER_URL, "blah.txt");
        env.put("connectionfactory.ServiceBusConnectionFactory", connectionString);
        Context context = null;
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        try
        {
            System.out.print("Creating context\n");
            context = new InitialContext(env);
            System.out.print("Creating connection factory\n");
            connectionFactory = (ConnectionFactory)context.lookup("ServiceBusConnectionFactory");
            System.out.print("Creating connection\n");
            connection = connectionFactory.createConnection();
        }
        catch (Exception e)
        {
            System.out.print("Exception creating context/connection factory/connection\n");
            System.out.print(e.toString());
            return;
        }

        Session session = null;
        MessageProducer producer = null;
        try
        {
            System.out.print("Creating session\n");
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.print("Creating producer\n");
            producer = session.createProducer(QueueImpl.createQueue(queueName));
        }
        catch (Exception e)
        {
            System.out.print("Exception creating session/producer\n");
            System.out.print(e.toString());
            return;
        }

        try
        {
            System.out.print("Creating message\n");
            TextMessage message = session.createTextMessage();
            message.setStringProperty("JMSXGroupID", queueName);
            message.setText("This is a message body");

            System.out.print("Sending\n");
            producer.send(message);
        }
        catch (Exception e)
        {
            System.out.print("Exception creating message and sending\n");
            System.out.print(e.toString());
            return;
        }

        try
        {
            System.out.print("Cleaning up\n");
            producer.close();
            session.close();
        }
        catch (Exception e)
        {
            System.out.print("Exception closing producer and session\n");
            System.out.print(e.toString());
        }

        Session receiveSession = null;
        MessageConsumer receiveConsumer = null;
        try
        {
            System.out.print("Prepping connection for receive\n");
            connection.start();
            System.out.print("Creating receive session\n");
            receiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.print("Creating consumer\n");
            receiveConsumer = receiveSession.createConsumer(QueueImpl.createQueue(queueName));
        }
        catch (Exception e)
        {
            System.out.print("Exception setting up for receive\n");
            System.out.print(e.toString());
            return;
        }

        try
        {
            while (true)
            {
                Message got = receiveConsumer.receive(10000);
                if (got != null)
                {
                    String body = ((TextMessage)got).getText();
                    System.out.printf("Received message with body '%s'\n", body);
                }
                else
                {
                    System.out.print("Timeout on receive, bailing\n");
                    break;
                }
            }
        }
        catch (Exception e)
        {
            System.out.print("Exception receiving\n");
            System.out.print(e.toString());
            return;
        }

        try
        {
            System.out.print("Cleaning up\n");
            receiveConsumer.close();
            receiveSession.close();
            connection.stop();
            connection.close();
        }
        catch (Exception e)
        {
            System.out.print("Exception closing producer and session and connection\n");
            System.out.print(e.toString());
        }

        System.out.print("Done\n");
    }

    public static String encode(String encodee)
    {
        String retval = "";
        try { retval = URLEncoder.encode(encodee, "UTF-8"); } catch (Exception e) { System.out.print("Encoding failed\n"); }
        return retval;
    }
}
Show: