エクスポート (0) 印刷
すべて展開

Service Bus キューを使用するアプリケーションの作成

更新日: 2015年3月

この記事では、キューの有効性について説明し、Microsoft Azure の Service Bus を使用するシンプルなキュー ベースのアプリケーションを作成する方法を示します。

ある小売業界のシナリオを考察してみましょう。この業界では、個々の Point of Sale (POS) 端末の販売データを商品の管理システムにルーティングする必要があり、このシステムでデータを使用して在庫の補充時期を決定します。次のソリューションでは、下図に示すように、端末と商品管理システムとの間の通信に Service Bus メッセージングを使用しています。

Service-Bus-Queues-Img1

それぞれの POS 端末は、DataCollectionQueue にメッセージを送信して、販売データをレポートします。これらのメッセージは、商品管理システムによって取得されるまでこのキューに保持されます。このパターンは、しばしば非同期メッセージングと呼ばれます。これは、POS 端末が処理を続行するために商品管理システムからの応答を待つ必要がないためです。

このアプリケーションのセットアップに必要なコードの説明に入る前に、このシナリオで POS 端末から商品管理システムに直接 (同期) 的に通信せずにキューを使用する利点について考察します。

非同期メッセージング パターンでは、プロデューサーとコンシューマーが同時にオンラインである必要はありません。このメッセージング インフラストラクチャでは、受信側がメッセージを受信する準備ができるまで、メッセージを確実に保存します。そのため、分散アプリケーションのコンポーネントが保守などのために意図的に接続解除されたり、クラッシュによって接続解除されたりしても、システム全体に影響は及びません。また、処理を行うアプリケーションは、1 日の特定の時間帯にオンラインにするだけで十分な場合があります。たとえば、この小売業のシナリオでは、営業時間の終了後にのみ商品管理システムをオンラインにする必要があります。

多くのアプリケーション システムでは、負荷は時間帯によって異なりますが、各作業単位で必要とされる処理時間は一定であることが一般的です。仲介のメッセージ プロデューサーおよびコンシューマーがキューを使用するということは、処理を行うアプリケーション (ワーカー) を、負荷のピーク時ではなく負荷が平均的な時間に処理を行うようにプロビジョニングする必要があることを意味します。キューの深さは、受信する負荷に応じて拡大したり縮小したりします。これにより、アプリケーションの負荷に対処するために必要なインフラストラクチャ量の点から、コストが直接的に節約されます。

Service-Bus-Queues-Img2

負荷が増えると、ワーカーのキューから読み出すために追加されるワーカーのプロセス数が増える可能性があります。各メッセージを処理するのは、ワーカー プロセスのうちの 1 つだけです。さらに、このプル ベースの負荷分散では、ワーカーのコンピューターどうしで処理能力が異なっていても、それぞれの最高速度でメッセージをプルするため、ワーカーのコンピューターの最適化が可能になります。このパターンは、しばしば競合コンシューマーのパターンと呼ばれます。

Service-Bus-Queues-Img3

メッセージのプロデューサーとコンシューマーの間を仲介するためにメッセージ キューを使用すると、コンポーネント間に組み込みの疎結合が提供されます。プロデューサーとコンシューマーはお互いを認識しないため、プロデューサーに影響を及ぼさずにコンシューマーをアップグレードできます。また、メッセージング トポロジを、既存のエンドポイントに影響を及ぼさずに発展させることもできます。これについては、パブリッシュとサブスクライブで詳しく説明します。

次のセクションでは、このアプリケーションをビルドするための Service Bus の使用法について説明します。

Service Bus を使用した作業を開始するには、 のアカウントが必要になります。アカウントを取得済みでない場合は、こちらから無料評価版にサインアップできます。Windows Live ID (Microsoft アカウント) を使用してサインインするように求められ、サインインすると、Service Bus アカウントに関連付けられます。終了すると、Service Bus サブスクリプションを新規作成できます。将来的には、Windows Live ID (Microsoft アカウント) にログインするだけで自分のアカウントに関連付けられているすべての Service Bus サブスクリプションにアクセスできます。

サブスクリプションを取得すると、サービスの名前空間を新規作成できます。新しいサービスの名前空間には、すべての Service Bus アカウントを通じて一意の名前を付ける必要があります。サービスの名前空間は、それぞれが、Service Bus エンティティのセットのコンテナーとして機能します。詳細については、TechNet の「 方法:Service Bus Service 名前空間を作成または変更する.

Service Bus サービスの名前空間を使用するには、アプリケーションは、Service Bus のアセンブリ (Microsoft.ServiceBus.dll) を使用する必要があります。このアセンブリは Microsoft Azure SDK の一部であり、Azure SDK ダウンロード ページからダウンロードできます。ただし、Service Bus の NuGet パッケージは、Service Bus API を取得し、Service Bus の依存関係をすべて備えたアプリケーションを構成するうえで最も簡単な方法です。NuGet および Service Bus パッケージの使用方法の詳細については、「NuGet Service Bus パッケージの使用」を参照してください。

Service Bus メッセージング エンティティ (キューおよび、トピックのパブリッシュとサブスクライブ) の管理操作は、NamespaceManager クラスを介して行われます。Service Bus は、Microsoft Azure Active Directory アクセス制御 (アクセス制御サービスまたは ACS) を使用して実装された要求ベースのセキュリティ モデルを使用します。特定のサービスの名前空間の NamespaceManager インスタンスを作成するには、適切な資格情報が必要です。TokenProvider クラスは、既知のトークン プロバイダーを返す組み込みのファクトリ メソッドを持つセキュリティ トークン プロバイダーを表します。ここでは、CreateSharedAccessSignatureTokenProvider メソッドを使用して、共有アクセス署名 (SAS) の資格情報を保持します。その後、NamespaceManager インスタンスが Service Bus サービスの名前空間のベース アドレスおよびトークン プロバイダーで作成されます。

NamespaceManager クラスはメッセージング エンティティを作成、列挙、および削除するメソッドを提供します。次に示すスニペットは、NamespaceManager インスタンスの作成方法と、このインスタンスを使用して DataCollectionQueue キューを作成する方法を示しています。

Uri uri = ServiceBusEnvironment.CreateServiceUri("sb""ingham-blog"string.Empty);
string name = "RootManageSharedAccessKey";
string key = "abcdefghijklmopqrstuvwxyz";
 
TokenProvider tokenProvider = 
    TokenProvider.CreateSharedAccessSignatureTokenProvider(name, key);
NamespaceManager namespaceManager = 
    new NamespaceManager(uri, tokenProvider);
namespaceManager.CreateQueue("DataCollectionQueue");

キューのプロパティの調整に使用できる CreateQueue メソッドのオーバーロードがあることに注意してください。たとえば、キューに送信するメッセージに既定の有効期限 (TTL) 値を設定できます。

たとえばメッセージの送信や受信などの Service Bus エンティティのランタイム操作では、アプリケーションはまず MessagingFactory オブジェクトを作成する必要があります。NamespaceManager クラスと同様、MessagingFactory インスタンスは、サービスの名前空間 のベース アドレスとトークン プロバイダーから作成されます。

 MessagingFactory factory = MessagingFactory.Create(uri, tokenProvider);

Service Bus キューから送受信されるメッセージは、BrokeredMessage クラスのインスタンスです。このクラスは、標準のプロパティ (LabelTimeToLive など)、アプリケーション プロパティを保持するために使用する辞書、および任意のアプリケーション データの本文のセットで構成されています。アプリケーションは、シリアル化が可能な任意のオブジェクトを渡すことで本文を設定できますが (次の例では、営業データを表す SalesData オブジェクトが POS 端末から渡されます)、これによって、DataContractSerializer を使用してオブジェクトがシリアル化されます。または、Stream を使用することもできます。

 BrokeredMessage bm = new BrokeredMessage(salesData);
 bm.Label = "SalesReport";
 bm.Properties["StoreName"] = "Redmond";
 bm.Properties["MachineID"] = "POS_1";

指定のキューにメッセージを送信する最も簡単な方法は、例の DataCollectionQueue では、CreateMessageSender を使用して、MessagingFactory インスタンスから直接 MessageSender オブジェクトを作成する方法です。

MessageSender sender = factory.CreateMessageSender("DataCollectionQueue");
sender.Send(bm);

キューからメッセージを受信する最も簡単な方法は、CreateMessageReceiver を使用して MessagingFactory から直接作成できる MessageReceiver オブジェクトを使用する方法です。メッセージ受信者は、次の 2 種類のモードで操作できます。ReceiveAndDelete および PeekLock。メッセージ受信者の作成時に、CreateMessageReceiver 呼び出しのパラメーターとして ReceiveMode が設定されます。

ReceiveAndDelete モードを使用している場合、受信���単発の操作です。つまり、Service Bus が要求を受信すると、メッセージを使用中とマークし、アプリケーションに返します。ReceiveAndDelete モードは最も簡単なモデルで、万一問題が発生したときにメッセージを処理しないことをアプリケーションが許容できるシナリオで特に効果的です。これを理解するために、コンシューマーが受信要求を発行して、その処理前にクラッシュするシナリオについて考えてみてください。Service Bus がメッセージを使用中とマークしていたために、アプリケーションが再起動してメッセージを再び使用し始めたときには、クラッシュ前に使用されていたメッセージが欠落することになります。

PeekLock モードでは、受信は 2 ステージの操作になり、欠落したメッセージを許容できないアプリケーションをサポートできるようになります。Service Bus が要求を受信すると、次に使用するメッセージを検索し、他のコンシューマーが受信しないようにこのメッセージをロック���てから、これをアプリケーションに返します。アプリケーションは、メッセージの処理が終了 (または後で処理するために確実に保管) すると、受信したメッセージで Complete を呼び出すことにより、受信プロセスの第 2 ステージを完了します。Service Bus は Complete を確認すると、このメッセージを使用中とマークします。

これ以外に、次の 2 とおりの結果になる場合があります。最初は、アプリケーションが何らかの理由でメッセージを処理できない場合に、受信するメッセージで (Complete ではなく) Abandon を呼び出すことができます。こうすると、Service Bus は、メッセージのロックを解除して、同じコンシューマーまたは完了している別のコンシューマーが再び受信できるようにします。2 番目は、ロックに関連付けられているタイムアウトがあり、このタイムアウトが期限切れになる前にアプリケーションがメッセージを処理できない場合 (アプリケーションがクラッシュするなど)、Service Bus がメッセージのロックを解除して再び受信できるようにするというものです。

この場合、アプリケーションがメッセージの処理は終了したが、Complete の要求を発行する前にクラッシュした場合は、アプリケーションが再起動すると、このメッセージはアプリケーションに再配信されることに注意する必要があります。これは、しばしば At Least Once 処理と呼ばれます。これは各メッセージは最低 1 回は処理されるが、特定の状況下では同じメッセージが再配信される場合があることを意味します。処理の重複を許容しないシナリオの場合は、アプリケーションで重複を検出するための追加のロジックが必要です。これはメッセージの MessageId プロパティを基に実行できます。このプロパティの値は、配信の試行を繰り返しても維持されます。これを Exactly Once 処理と呼びます。

次に示すコードは、ReceiveMode 値が明示的に指定されていない場合の既定である、PeekLock モードを使用したメッセージの受信方法と処理方法を説明したものです。

MessageReceiver receiver = factory.CreateMessageReceiver("DataCollectionQueue");
BrokeredMessage receivedMessage = receiver.Receive();
try
{
    ProcessMessage(receivedMessage);
    receivedMessage.Complete();
}
catch (Exception e)
{
    receivedMessage.Abandon();
}

このセクションで前に説明した例では、MessageSender オブジェクトと MessageReceiver オブジェクトを MessagingFactory から直接作成して、それぞれを使用してキューとのメッセージの送受信を行いました。別の方法としては、QueueClient クラスを使用することで、セッションなどのより高度な機能に加えて送受信操作をサポートする方法があります。

QueueClient queueClient = 
     factory.CreateQueueClient("DataCollectionQueue");
queueClient.Send(bm);
            
BrokeredMessage message = queueClient.Receive();
try
{
    ProcessMessage(message);
    message.Complete();
}
catch (Exception e)
{
    message.Abandon();
} 

表示:
© 2015 Microsoft