Windows Azure & Java. Что такое Service Bus и как её использовать из Java-приложения. Часть 1. (ru-RU)

Цикл "Windows Azure & Java": 
Windows Azure, приложение на Java "Hello World" с использованием JDK 1.6 и Tomcat 7. 
Windows Azure, приложение на Java "Hello World" с использованием JDK 1.6 и Glassfish 3
Windows Azure, приложение на Java. Service Runtime Library
Вопросы отладки Java-приложений. 
Дополнительные возможности. Remote Desktop, Session Affinity. 
Windows Azure & Java. Основы хранилища. Блобы. Доступ из локального приложения. 
Windows Azure & Java. Основы хранилища. Очереди. Доступ из локального приложения.
Windows Azure & Java. Что такое Service Bus и как её использовать из Java-приложения. Часть 1.
Windows Azure & Java. Что такое Service Bus и как её использовать из Java-приложения. Часть 2. Топики. 

Данная статья является заключительной частью цикла Windows Azure & Java и содержит в себе сведения о том, что такое Service Bus, что такое очереди и топики, а также о том, как использовать Service Bus из Java-приложения.

Рис.1. Создание шины сообщений между клиентом и сервисом с помощью Service Bus

Один из дополнительных компонентов платформы Windows Azure, предоставляющий функциональность безопасного, устойчивого к ошибкам и высокомасштабируемого механизма обмена сообщения, является Service Bus. Service Bus позволяет разработчику объединять в единую инфраструктуру обмена сообщениями несколько сущностей (на рис.1 приведен пример подобного объединения), использовать множество протоколов обмена сообщениями, и делать это достаточно красиво и аккуратно – приложение регистрируется в Service Bus Relay, разработчик/администратор выбирает некоторый уникальный адрес конечной точки входа (следуя конвенциям именования –https://[имяточкивхода].accesscontrol.windows.net/), после чего приложение может инициировать создание исходящего подключения к Service Bus Relay. Далее приложение будет держать этот канал открытым до тех пор, пока не будет выключено или перезагружено, постоянно посылая небольшие heartbeat-пакеты (раз в несколько секунд). Весь траффик, попадающий в Service Bus Relay, будет перенаправлен затем на конечную точку входа в облако и локальное приложение-приемник (которых, к слову, может быть до 25 одновременно на одну конечную точку входа сервиса – при этом Service Bus Relay случайным образом выбирает, какой из приемников-слушателей получит входящий запрос, обеспечивая таким образом балансировку нагрузки. При этом необходимо учитывать, что нет никаких гарантий, что нагрузка будет ровно “размазана” по всем слушателям). 

Service Bus оперирует двумя основными концепциями – Queues (очереди) и Topics (топики). Рассмотрим их немного подробнее.

Очереди

Очереди Service Bus предоставляют модель коммуникаций, называемую brokered messaging communication – при использовании очередей части распределенного приложения осуществляют коммуникации между собой через очередь (что позволяет разрабатывать слабо-связанные системы) – отправитель сообщения кладет сообщение в очередь (очередь следует модели First In, First Out - FIFO), откуда (из начала) его забирает обработчик. Всё происходит асинхронно, поэтому никому ждать, пока кто-то закончит, не требуется. Принцип работы распределенной архитектуры с использованием очереди Service Bus изображен на рис. 2.

Рис.2. Принцип работы распределенной архитектуры с использованием очереди Service Bus

Подобную архитектуру можно использовать при разработке самых разных приложений (это является типичным паттерном в “облачных” приложениях), например для обеспечения коммуникаций между веб и воркер-ролями в приложении в Windows Azure, коммуникаций между локальным и облачным приложением в гибридной модели, а также для связывания нескольких частей распределенного локальногоприложения (таким образом, сценарий локального приложения расширяется в облако, но только в контексте хранения и маршрутизации сообщений). Очереди Service Bus позволяют масштабировать приложения и добиваться высокой степени отказоустойчивости за счёт собственных механизмов – если обработчики сообщения не могут какое-то время обработать сообщения, сообщения будут храниться до определенных пор либо до того момента, как обработчики не вернутся обратно в онлайн и не будут готовы к работе.

Топики

По принципу работы топики похожи на очереди, однако их главной особенностью является множество конечных точек для обработчиков – они называются подписками (Subscriptions), и такая модель называется publish/subscribe messaging communication – каждый потребитель-обработчик “подписывается” на подписку. Каждая подписка имеет доступ ко всем добавленным сообщениям, при этом механизм фильтрации позволяет им определить, доступно ли сообщение через подписку или нет, и если доступно, то обработчик забирает сообщение из подписки аналогичным очередям образом. Таким образом, можно реализовать распространение одного и того же сообщения на несколько подписок. Механизм подписок использует принцип работы FIFO и представлен на рис.3.

Рис.3. Принцип работы подписок (Subscription) Service Bus

Топики и очереди позволяют разрабатывать более слабо связанные приложения, с временной независимостью (когда отправитель и обработчик сообщения не обязаны быть в он-лайн одновременно), балансировкой нагрузки.

Практика

Для того, чтобы начать работу с Service Bus в Windows Azure, необходимо создать собственно именованный сервис (пространство имен), которое будет предоставлять контейнер для Service Bus.

1) Зайдите на портал управления Windows Azure (https://windows.azure.com  )

2) В левом нижнем углу портала управления нажмите Service Bus, Access Control & Caching, после чего нажмите на Service Bus и нажмите в левом верхнем углу портала управления кнопку New. (рис.4).

Рис.4. Интерфейс портала управления Windows Azure, управление Service Bus

3) В появившемся диалоговом окне Create a new Service Namespace введите значение Namespace и нажмите Check Availability – в этот момент введенное значение будет проверено на уникальность в пределах платформы(рис.5).

Рис 5.Интерфейс портала управления Windows Azure, создание контейнера Service Bus

4) Выберите соответствующие значения региона расположения вашего контейнера (лучше, если это будет тот же регион, в котором вы используете ваше приложение) и нажмите Create Namespace. Контейнер создан, дождитесь появления статуса Active(рис.6).

Рис 6.Интерфейс портала управления Windows Azure, список контейнеров Service Bus

Для управления вашей Service Bus вам необходимо получить определенные данные. В панели слева нажмите на Service Bus и выберите созданный только что контейнер (рис.7).

Рис 7.Интерфейс портала управления Windows Azure, список контейнеров Service Bus

В правой панели свойств вы увидите необходимые вам данные – в том числе Default Key. Нажмите на кнопку View рядом с Default Key для отображения ключа доступа.

Если вы хотите, то можете отсюда же создать очередь и в дальнейшем использовать имя созданной очереди в своем приложении. Для этого необходимо выбрать ваш контейнер и нажать кнопку New Queue. В появившемся диалоговом окне введите необходимые данные (время жизни сообщения, время locked-состояния и так далее) и нажмите Ok.

Всё, на этом настройка Service Bus закончена. Можно переходить к приложению.

Практика. Настройка приложения.

Как и в прошлых статьях, приведу лишь общий кусок Java-кода, который готовы к использованию, с комментариями.

 

import java.math.BigDecimal;
import java.util.Calendar;
import java.util.List;

import com.microsoft.windowsazure.services.serviceBus.*;
import com.microsoft.windowsazure.services.serviceBus.models.*;
import com.microsoft.windowsazure.services.core.*;

import javax.xml.datatype.*;
import javax.xml.datatype.DatatypeConstants.Field;

public class TestServiceBusRelay {

    // Все операции по управлению вашим контейнером Service Bus выполняются с
    // помощью класса ServiceBusContract, при создании которого конструктору
    // передаётся
    // конфигурация вашего контейнера - название контейнера, имя issuer и ключ.
    // Все эти данные можно получить из панели свойств Properties вашего
    // контейнера Service Bus на портале Windows Azure.
    // Далее можно воспользоваться функциональностью класса ServiceBusService,
    // предоставляющего управление очередями - создание, удаление и т.д. В
    // данном методе создаётся контейнер Service Bus с именем ahrimansb.

    private static ServiceBusContract createServiceBus(String issuer, String key) {

        Configuration config = ServiceBusConfiguration
                .configureWithWrapAuthentication("ahrimansb", issuer, key);
        ServiceBusContract service = ServiceBusService.create(config);
        return service;
    }

    // В данном методе используется функциональность класса QueueInfo, с помощью
    // которой в данном случае определяется
    // максимальный размер очереди в мегабайтах (указывается максимальный размер
    // в 5Гб). С помощью методов QueueInfo можно настраивать различные параметры
    // ваших очередей, в т.ч. Time To Live для сообщений, максимальный размер и
    // многое другое.

    private static QueueInfo createQueue(String name, ServiceBusContract service) {

        long queueSize = 5120;

        QueueInfo queueInfo = new QueueInfo(name);
        try {
            queueInfo.setMaxSizeInMegabytes(queueSize);
            CreateQueueResult result = service.createQueue(queueInfo);

        } catch (ServiceException e) {
            System.out.print("ServiceException: " + e.getMessage());
        }

        return queueInfo;

    }

    // Для отображения сообщения в объектной модели существует класс
    // BrokeredMessage, объекты которого содержат набор методов для управления
    // сообщением,
    // набор параметров и набор данных. В набор данных можно передать любой
    // сериализуемый объект. Очереди Service Bus имеют ограничение на размер
    // сообщения в 256 мегабайт (заголовок, содержащий свойства - 64 мегабайт
    // однако ограничений как таковых на количество хранимых в очереди сообщений
    // нет, кроме задаваемого вами ограничения-максимального размера очереди.

    private static void putMessageToQueue(String queueName,
            ServiceBusContract service, BrokeredMessage message) {

        try {
            service.sendQueueMessage(queueName, message);
        } catch (ServiceException e) {
            System.out.print("ServiceException: " + e.getMessage());
        }

    }

    // Аналогично сервису очередей хранилища Windows Azure, вы можете
    // использовать два метода для извлечения сообщений из очередей -
    // получение и удаление (ReceiveAndDelete) и "подсматривание" - получение
    // сообщение, но не удаление его из очереди (PeekLock). При использовании
    // метода ReceiveAndDelete при получении очередью запроса на извлечение
    // сообщения, очередь помечает это сообщение как "потреблённое".
    // При использовании метода PeekLock процесс получения дробится на два этапа
    // - когда Service Bus получает запрос на извлечение сообщения, он находит
    // это сообщение, помечает его как locked (в этот момент другие обработчики
    // перестают видеть это сообщение) и возвращает его приложению. После
    // окончания
    // обработки приложением сообщения закрывается второй этап процесса с
    // помощью вызова метода Delete полученного сообщения. После этого сообщение
    // помечается как удаленное.
    // Типичным паттерном опроса очереди на наличие новых сообщений является
    // использование бесконечного цикла while. В данном методе очередь
    // опрашивается постоянно. Если вы хотите
    // ограничить выполнение каким-либо количеством полученных сообщений, вам
    // необходимо реализовать логику с использованием break.

    private static void getMessageFromQueue(String queueName,
            ServiceBusContract service) throws ServiceException {

        // ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
        // opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
        while (true) {
            ReceiveQueueMessageResult result = service
                    .receiveQueueMessage(queueName);
            BrokeredMessage message = result.getValue();
            if (message != null && message.getMessageId() != null) {
                try {
                    System.out
                            .println("Сообщение: "
                                    + message.toString());
                    System.out.println("ID сообщения: "
                            + message.getMessageId());
                    System.out
                            .println("Если вы задали какое-то свойство, его можно получить с помощью метода getProperty(): "
                                    + message.getProperty("CustomProperty"));
                    System.out.println("Сообщение прочитано - удалено.");
                    service.deleteMessage(message);
                } catch (Exception ex) {
                    // если было выброшено исключение, сообщение будет
                    // разблокировано для других обработчиков
                    System.out.println("Исключение!");
                    service.unlockMessage(message);
                }
            } else {
                System.out.println("Больше нет сообщений, прерываем цикл.");
                break;
            }
        }

    }

    public static void main(String args[]) {

        ServiceBusContract service = createServiceBus("owner",
                "WqmNgDFb1mgicPy7eHzx5sLklBS1Qwb8QjIujhFk8P4=");
        QueueInfo queue = createQueue("myqueue", service);
        BrokeredMessage message = new BrokeredMessage(
                "Наше сообщение в очередь.");
        putMessageToQueue("myqueue", service, message);
        try {
            getMessageFromQueue("myqueue", service);

        } catch (ServiceException e) {
            e.printStackTrace();
        }

    }

}

Далее просто запустите ваше приложение. Для этого нажмите ALT+Shift+X либо нажмите соответствующую кнопку в меню в Eclipse (рис. 8).

Рис.8. Запуск простого Java-проекта.

В консоли вы должны увидеть результат.

Обратите внимание, что, если вы получаете исключение ниже, вам необходимо реализовать проверку на существование очереди перед тем, как ее создавать – это исключение значит, что такая очередь уже есть в контейнере.

04.04.2012 11:57:06 com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor processCatch
WARNING: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
    at com.sun.jersey.api.client.WebResource.handle(WebResource.java:676)
    at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74)
    at com.sun.jersey.api.client.WebResource$Builder.put(WebResource.java:533)
    at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusRestProxy.createQueue(ServiceBusRestProxy.java:265)
    at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor.createQueue(ServiceBusExceptionProcessor.java:188)
    at TestServiceBusRelay.createQueue(TestServiceBusRelay.java:46)
    at TestServiceBusRelay.main(TestServiceBusRelay.java:135)
ServiceException: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
Response Body: <Error><Code>409</Code><Detail>Conflict.TrackingId:59992d2d-e47f-40a0-

В этом случае вы можете также просто удалить очередь с помощью портала управления Windows Azure.