基礎

Service Bus のバッファー

Juval Lowy

サンプル コードのダウンロード

2009 年 10 月号のコラム「Service Bus のルーター」(msdn.microsoft.com/magazine/ee335696) では、Windows Azure AppFabric Service Bus で考えられる今後の方向性について説明しました。これは、最高のインターセプターになります。この前回コラムではルーターの機能について説明し、次回はキューについて執筆するとお約束しました。

ところがその後、ルーターとキューのリリースがどちらも Service Bus の 2 回目のリリースまで延期されました。代わりに現時点では、Service Bus のバッファーが提供される予定です。今後のリリースでは、ログ記録、診断など、インストルメンテーションのさまざまなオプションが追加される可能性があります。今後のコラムでは、それらの側面についても紹介していくつもりです。今回は、バッファーの側面について説明し、いくつか高度な Windows Communication Foundation (WCF) プログラミング手法も紹介します。

Service Bus のバッファー

Service Bus では、サービスの名前空間に含まれるすべての URI が、実際にはメッセージングのアドレス指定可能な中継点となります。クライアントは、その中継点にメッセージを送信でき、そこからメッセージをサービスに中継できます。ただし、各中継点はバッファーとしても機能します (図 1 参照)。

Figure 1 Buffers in the Service Bus

図 1 Service Bus のバッファー

バッファーを監視しているサービスがなくても、メッセージは一定期間バッファーに格納されます。この期間は構成可能です。複数のサービスからバッファーを監視できますが、メッセージを明示的に確認してロックしない限り、メッセージを取得できるのはその中の 1 つのサービスのみです。

バッファーを境にクライアントとサービスが分離されるため、両者を同時に実行しておく必要はありません。クライアントは、実際のサービス エンドポイントではなくバッファーとやりとりし、すべてのメッセージが一方向になり、メッセージ呼び出しの結果やエラーを受け取る (組み込みの) 方法はありません。

Service Bus のバッファーは、Microsoft Message Queuing (MSMQ) のキューや WCF キューを使用するサービスと同じものではなく、次に示すようにいくつか重要な違いがあります。

  • Service Bus のバッファーは永続的ではなく、メッセージはメモリに格納されます。つまり、(あまり起こりそうにありませんが) Service Bus 自体で致命的なエラーが発生すると、メッセージが失われるおそれがあります。
  • Service Bus のバッファーではトランザクション処理が行われず、メッセージの送信も受信もトランザクションの一環として実行されることはありません。
  • バッファーでは、長期にわたるメッセージを処理できません。サービスが 10 分以内にバッファーからメッセージを取得しないと、そのメッセージは破棄されます。WCF MSMQ ベースのメッセージにも有効期限はありますが、その期間はずっと長く、既定値は 1 日です。これにより、まったく連携されていない操作やネットワークに接続されていないアプリケーションの範囲が大きく広がります。
  • バッファーにはサイズ制限があり、50 個を超えるメッセージは保持できません。
  • バッファーに格納されるメッセージのサイズにも上限があり、1 つのメッセージにつき 64 KB です。
MSMQ のメッセージにも最大サイズ制限が課せられますが、サイズはかなり大きく、1 つのメッセージにつき 4 MB です。

したがって、バッファーでは、本当の意味で、キューに登録される呼び出しがクラウド上で実現されません。どちらかといえば、接続に融通性がもたらされるため、呼び出しは、キューに登録される呼び出しと一方向非同期呼び出しの中間に位置します。

バッファーが役に立つシナリオは 2 つあります。1 つは、クライアントとサービスの接続が不安定なアプリケーションです。接続と切断が繰り返されても、短時間オフラインだった間のメッセージがバッファーに保持されていれば、処理を続行することができます。もう 1 つのより一般的なシナリオは、クライアントが一方向非同期呼び出しを発行し、応答バッファー (これについては、後半の「応答サービス」で説明します) を使用して、呼び出し結果を処理する場合です。このようなやりとりでは、ネットワーク接続は、ストレージ機能のない融通の利かないネットワーク接続というよりも、柔軟性のあるバンジージャンプ用のひものように見なされます。

バッファーを操作する

バッファー アドレスは一意でなければなりません。1 つのバッファーには 1 つのアドレスのみを関連付けることができ、他のバッファーやサービスが既に使用しているアドレスにはできません。ただし、複数のクライアントやサービスが同じバッファーからメッセージを取得することは可能です。さらに、バッファー アドレス方式には、HTTP か HTTPS のいずれかを使用する必要があります。バッファーからメッセージを送受信するために、Service Bus では System.Messaging と同様の API を提供しています。つまり、メッセージを加工しないでそのまま操作する必要があります。Service Bus の管理者は、サービスやクライアントとは別にバッファーを管理します。各バッファーには、バッファーの動作と有効期間を制御するポリシーが必要です。既定では、Service Bus の管理者は、プログラムからバッファーを作成して管理する呼び出しを行う必要があります。

各バッファー ポリシーは、MessageBufferPolicy クラスのインスタンスを通じて表します (図 2 参照)。

図 2 MessageBufferPolicy クラス

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

Discoverability ポリシー プロパティは、DiscoverabilityPolicy 型の列挙値で、バッファーが Service Bus のレジストリ (Atom フィード) に含まれるかどうかを制御します。

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

Discoverability の既定値は DiscoverabilityPolicy.Managers で、承認に関するクレームを管理する必要があることを意味します。値を DiscoverabilityPolicy.Public に設定すると、承認なしにフィードが発行されます。

ExpiresAfter プロパティは、バッファー内のメッセージの有効期限を制御します。既定値は 5 分、最小値は 1 分、指定できる最大値は 10 分です。有効期限を長く構成しようとすると、暗黙のうちに無視されます。

MaxMessageCount プロパティは、バッファー サイズの上限を指定します。このポリシーの既定値はメッセージ 10 個分で、最小値は当然ながら 1 です。既に説明したように、バッファーに格納できるメッセージの最大サイズは 50 個です。これより大きいサイズを構成しようとすると、暗黙のうちに無視されます。

OverflowPolicy プロパティは、単一値から成る列挙値で、次のように定義されます。

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy は、バッファーが限界に達したときのメッセージの処理方法を制御します。バッファーが限界に達したときとは、バッファーが既にいっぱいになっているときを指します (これは MaxMessageCount で定義されます)。唯一可能なオプションは、メッセージを拒否して、そのメッセージをエラーと共に送信者に送り返すことです。

単一値の列挙は、今後オプションが追加される場合に備えてプレースホルダーとして機能します。たとえば、送信者に通知することなくメッセージを破棄したり、バッファーからメッセージを削除して新しいメッセージを受け取ったりするオプションが考えられます。

最後の 2 つのプロパティは、セキュリティ構成を行います。AuthorizationPolicy プロパティは、クライアントのトークンを承認するかどうかを Service Bus に指示します。

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

AuthorizationPolicy.Required の既定値では、送信クライアントと受信クライアントの両方を承認する必要があります。

最後に、TransportProtection プロパティは、バッファーに送信されるメッセージの転送セキュリティの最小レベルを規定します。これには、TransportProtectionPolicy 型の列挙値を使用します。

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

TransportProtectionPolicy.AllPaths という転送セキュリティが、すべてのバッファー ポリシーの既定値です。これにより、HTTPS アドレスを使用することが義務付けられます。

バッファーを管理するには MessageBufferClient クラスを使用します (図 3 参照)。

図 3 MessageBufferClient クラス

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

静的メソッドの MessageBufferClient を使用して、MessageBufferClient の承認済みインスタンスを取得します。これを行うには、この静的メソッドに (TransportClientEndpointBehavior 型の) Service Bus の資格情報を渡します。MessageBufferClient を使用するときは、必ず GetMessageBuffer メソッドを呼び出して、Service Bus にバッファーが存在しているかどうかを確認する必要があります。バッファーがなければ、GetMessageBuffer から例外がスローされます。

プログラムからバッファーを作成する方法は次のとおりです。

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

この例では、バッファー ポリシー オブジェクトのインスタンスを作成して、このポリシーに目的の値を設定します。バッファーをインストールするには、ポリシーと有効な資格情報を指定して、MessageBufferClient の CreateMessageBuffer メソッドを呼び出すだけです。

プログラムから呼び出す代わりに、私が開発した Service Bus Explorer (ルーターに関するコラムで紹介しました。このコラムのサンプル コードはオンラインで入手できます) を使用して、バッファーを表示および変更できます。図 4 に、バッファー アドレスとポリシーのさまざまなプロパティを指定して、新しいバッファーを作成する方法を示します。ほぼ同じようなやり方で、サービスの名前空間内にあるすべてのバッファーを削除することもできます。

Figure 4 Creating a Buffer Using the Service Bus Explorer
図 4 Service Bus Explorer を使用したバッファーの作成

サービスの名前空間のツリーでバッファーを選択し、右側のウィンドウでバッファーのプロパティを操作して、既存のバッファーのポリシーを調査、変更したり、バッファーからメッセージを削除したり、バッファーを削除することもできます (図 5 参照)。

Figure 5 A Buffer in the Service Bus Explorer
図 5 Service Bus Explorer でのバッファー

管理を効率化する

バッファーを作成する際には、バッファーのサイズと有効期間の両方を最大にして、クライアントとサービスとのやりとりに多くの時間を与えるのが最適です。さらに、バッファーを検出可能にして、Service Bus のレジストリにバッファーを表示できるようにすることをお勧めします。バッファーを使用するようになったら、クライアントとサービスはいずれも、バッファーが既に作成されていることを確認し、作成されていなければ続けて作成します。

これらの手順を自動化するために、ServiceBusHelper クラスを作成しました。

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

CreateBuffer メソッドは、メッセージの最大容量が 50 個で有効期間が 10 分の検出可能なバッファーを新しく作成します。バッファーが既に存在している場合は古いバッファーを削除します。VerifyBuffer メソッドは、バッファーが存在していることを確認し、存在していなければ新しいバッファーを作成します。PurgeBuffer は、診断中やデバッグ中に、バッファーに格納されたすべてのメッセージを削除するのに使用します。DeleteBuffer は、バッファーを削除します。図 6 に、これらのメソッドの実装の一部を示します。

図 6 バッファーのヘルパー メソッドの一部

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

BufferExists メソッドは、MessageBufferClient の GetPolicy メソッドを使用して、バッファーが存在しているかどうかを確認します。エラーをキャッチした場合は、バッファーが存在していないことを示すと解釈します。バッファーの削除は、バッファーのポリシーをコピーして、バッファーを削除し、コピーしたポリシーを使用して (同じアドレスで) 新しいバッファーを作成することで行います。

メッセージを送受信する

既に説明したように、Service Bus のバッファーでは、WCF メッセージを加工しないでそのまま操作する必要があります。これは、(バッファーを作成または取得するときに取得される) MessageBufferClient の Send メソッドと Retrieve メソッドを使用して行います。

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

いずれのメソッドもタイムアウトすることになっていて、パラメーターなしのバージョンは、既定値の 1 分でタイムアウトします。送信者にとって、タイムアウトはバッファーがいっぱいのときに待機する長さを意味します。受信者にとって、タイムアウトはバッファーが空のときに待機する長さを意味します。

送信側から未加工のメッセージをバッファーに送信するコードを以下に示します。

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

まず、送信者は資格情報のオブジェクトを作成し、これを使用して MessageBufferClient のインスタンスを取得します。次に、WCF メッセージを作成して、バッファーに送信します。受信側でバッファーから未加工のメッセージを取得するコードを以下に示します。

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

バッファーを使用するサービス

先ほどのコード スニペットのように未加工の WCF メッセージを使用する機能は、Service Bus が提供する必要があります。にもかかわらず、そのようなプログラミング モデルには、不十分な点がたくさんあります。これは、厄介かつ面倒で、構造化されておらず、オブジェクト指向でもなければタイプ セーフでもありません。これでは、System.Messaging API を使用して MSMQ に対して明示的にプログラミングしていた、WCF 自体より前の時代に逆戻りしてしまいます。また、メッセージの内容を解析して、その要素を活用する必要があります。

さいわい、提供されている基本機能は改良できます。未加工のメッセージを操作する代わりに、この操作を、クライアントとサービス間の構造化された呼び出しに昇格します。これには、かなり低レベルの細かい処理が必要ですが、この処理を一連のヘルパー クラスと共にカプセル化することができました。

構造化され、バッファーを使用する呼び出しをサーバー側で提供するために、BufferedServiceBusHost<T> を記述しました。これは次のように定義します。

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

WCF と MSMQ バインディングを使用した後に BufferedServiceBusHost<T> をモデル化しました。メッセージを取得するバッファーのアドレス (1 つまたは複数) を指定するコンストラクターを用意する必要があります。残りは、いつもの WCF サービス ホストと同じです。

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

WCF サービス ホストがさまざまなキューを備えた複数のエンドポイントを開くことができるのと同じように、コンストラクターに複数のバッファー アドレスを指定して監視することができます。構成ファイルのサービス エンドポイントのセクションで、これらのバッファーのアドレスを指定する必要 (方法) はありません (ただし、設計によっては、バッファー アドレスをアプリケーション設定のセクションから指定できます)。

Service Bus のバッファーとの実際の通信は、未加工の WCF メッセージを使用して行われますが、その処理はカプセル化されます。BufferedServiceBusHost<T> は、図 6 に示している ServiceBusHelper.VerifyBuffer のバッファー ポリシーを使用して、指定されたバッファーが実際に存在していることを確認し、バッファーが存在していない場合はバッファーを作成します。BufferedServiceBusHost<T> は、すべてのパスをセキュリティで保護する既定の転送セキュリティを使用します。また、指定されたサービスのジェネリック型パラメーター T のコントラクトがすべて一方向であることも確認します。つまり、すべてのコントラクトには、(一方向リレー バインドと同じように) 一方向操作のみが含まれます。最後に、BufferedServiceBusHost<T> は、ホストを終了するときに、デバッグ ビルドのみですべてのバッファーを削除して、次のデバッグ セッションがスムーズに開始されるようにします。

BufferedServiceBusHost<T> は、指定されたサービスをローカルにホストすることで動作します。型パラメーター T のサービス コントラクトごとに、BufferedServiceBusHost<T> が IPC (名前付きパイプ) 経由でエンドポイントを追加します。それらのエンドポイントへの IPC バインドは、タイムアウトしないように構成します。

IPC には必ずトランスポート セッションが含まれますが、セッションごとのサービスで MSMQ の動作を模倣することは、呼び出しごとのサービスとして扱われます。キューから削除された各 WCF メッセージは、サービスの新しいインスタンスのように振る舞い、MSMQ バインディングと同様、以前のメッセージと同時に実行される可能性があります。指定されたサービスの種類がシングルトンの場合、BufferedServiceBusHost<T> はこれを優先し、MSMQ バインディングと同様に、すべてのバッファーとエンドポイントにまたがるメッセージをすべて同じサービス インスタンスに送信します。

BufferedServiceBusHost<T> は、別のバックグラウンド ワーカー スレッドで、指定された各バッファーを監視します。メッセージがバッファーに格納されると、BufferedServiceBusHost<T> がこれを取得して、未加工の WCF メッセージを、IPC 上の適切なエンドポイントの呼び出しに変換します。

図 7 に、BufferedServiceBusHost<T> の一部を示します。このコードでは、ほとんどのエラー処理とセキュリティは削除されています。

図 7 BufferedServiceBusHost<T> の一部

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost<T> は、ローカルにホストされている IPC のエンドポイントへのプロキシを、m_Proxies というディクショナリに格納します。

Dictionary<string,IDuplexSessionChannel> m_Proxies;

ディクショナリのキーは、エンドポイントのコントラクト型の名前です。

コンストラクターは、指定されたバッファー アドレスを格納してから、リフレクションを使用し、サービスの種類に基づいてすべてのインターフェイスのコレクションを取得します。BufferedServiceBusHost<T> はインターフェイスごとに一方向操作しか含まれないことを確認し、基本クラスの AddServiceEndpoint を呼び出して、そのコントラクト型のエンドポイントを追加します。アドレスは、パイプの名前の GUID を使用した IPC アドレスです。コンストラクターは、IPC バインディングを使用して、IChannelFactory<IDuplexSessionChannel> 型のチャネル ファクトリを構築します。IChannelFactory<T> は、バインディング経由で厳密に型指定されていないチャネルを作成するために使用します。

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

内部ホストとそのすべての IPC のエンドポイントを開いたら、OnOpened メソッドでそれらのエンドポイントへの内部プロキシとバッファーのリスナーを作成します。これらの 2 つの手順が、BufferedServiceBusHost<T> の中核です。プロキシを作成するには、エンドポイントのコレクションを反復処理します。各エンドポイントのアドレスを取得し、IChannelFactory<IDuplexSessionChannel> を使用して、そのアドレスに対するチャネルを作成します。次に、そのチャネル (プロキシ) をディクショナリに格納します。CreateListeners メソッドは、指定されたバッファー アドレスを反復処理します。アドレスごとにバッファーを確認し、ワーカー スレッドを作成して、バッファーのメッセージをキューから取り出します。

Dequeue メソッドは、MessageBufferClient を使用して、無限ループの中でメッセージを取得し、Dispatch メソッドを使用してこれらのメッセージをディスパッチします。Dispatch メソッドは、メッセージから対象のコントラクト名を抽出し、これを使用してプロキシのディクショナリから IDuplexChannel を検索して、IPC 経由でメッセージを送信します。IDuplexChannel は、基になる IPC チャネルでサポートされており、未加工のメッセージを送信する方法を提供します。

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

IPC の呼び出し中にエラーが発生したら、BufferedServiceBusHost<T> はそのエンドポイントに対して管理するチャネルを再作成します (これは図 7 には示していません)。ホストを終了するときは、プロキシを閉じる必要があります。これにより、進行中の呼び出しが完了するまで適切に待機します。問題は、取得しているすべてスレッドを適切に閉じる方法です。というのも、MessageBufferClient.Retrieve はブロック操作で、これを中止するための組み込みの手法がないためです。解決策は、監視している各バッファーに特殊なプライベート メッセージを投稿する方法です。このプライベート メッセージの処理により、取得しているスレッドに終了を通知します。これを行うのが SendCloseMessages メソッドです。CloseListeners メソッドは、そのプライベート メッセージをバッファーに格納してから、リッスンしているすべてのスレッドをまとめて、これらが終了するまで待機します。リッスンしているスレッドを閉じると、内部プロキシへのメッセージ提供が停止し、(進行中の現在の呼び出しがすべて返されたときに) プロキシを閉じると、ホストをシャットダウンする準備が整います。BufferedServiceBusHost<T> は、単純にすべてのスレッドを中止する、適切とはいえない Abort メソッドもサポートしています (これは図 7 には示していません)。

最後に、BufferedServiceBusHost<T> は、IServiceBusProperties インターフェイスをサポートします。これは、以下のように定義しています。

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

フレームワークを構築する際 (特にバッファーを効率化する際)、いくつかの場所でこのようなインターフェイスが必要でした。クライアントの場合は、BufferedServiceBusClient<T> クラスを記述しました。これは、以下のように定義しています。

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient<T> は、私が作成した HeaderClientBase<T,H> から派生します。これは、情報をメッセージ ヘッダーに渡すために使用するヘルパーのプロキシです。詳細については、2007 年 11 月号のコラム「WCF での同期コンテキスト」(msdn.microsoft.com/magazine/cc163321) を参照してください。

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

この基本クラスの目的は、応答サービスをサポートすることです。応答サービスについてはこの後説明します。バッファーを使用するサービスのプレーンなクライアントでは、その派生は重要ではありません。

BufferedServiceBusClient<T> を、クライアントの構成ファイルと共に使用したり、クライアントの構成ファイルなしで使用したりできます。バッファー アドレスを受け取るコンストラクターでは、構成ファイルは不要です。パラメーターなしのコンストラクターや、エンドポイント名を受け取るコンストラクターでは、構成ファイルに、一方向リレー バインディングを指定するコントラクト型と一致しているエンドポイントを含めます (ただし、一方向リレー バインディングは BufferedServiceBusClient<T> では完全に無視されます)。

BufferedServiceBusClient<T> からプロキシを派生するときは、Channel プロキシを直接使用するのではなく、保護されている Enqueue メソッドを使用する必要があります。

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

Enqueue メソッドは、Channel プロパティの使用をラップするデリゲート (ラムダ式) を受け取ります。結果はタイプ セーフのままです。図 8 に、BufferedServiceBusClient<T> クラスの一部を示します。

図 8 BufferedServiceBusClient<T> クラスの一部

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

BufferedServiceBusClient<T> のコンストラクターでは、基本コンストラクターにバッファー アドレスとバインディングを指定します。一方向操作の検証を強制するために、バインディングには必ず一方向リレー バインドを指定します。CreateChannel メソッドは、対象のバッファーが存在していることを確認し、そのバッファーを表す MessageBufferClient を取得します。BufferedServiceBusClient<T> の中核が PreInvoke メソッドです。PreInvoke メソッドは、HeaderClientBase<T,H> の基本クラスである InterceptorClientBase<T> で提供される仮想メソッドです。

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

PreInvoke メソッドでは、クライアントが WCF メッセージをディスパッチする前に簡単に処理できます。BufferedServiceBusClient<T> は、PreInvoke メソッドをオーバーライドし、バッファーのクライアントを使用してメッセージをバッファーに送信します。これにより、クライアントでは構造化プログラミング モデルを維持し、BufferedServiceBusClient<T> が WCF メッセージの操作をカプセル化します。不都合な点として、メッセージを一度しか送信できないことと、ClientBase のルート クラスがメッセージを送信しようとしたときに InvalidOperationException がスローされることが挙げられます。ここで、Enqueue の本領が発揮され、その例外が削除されます。

応答サービス

2007 年 2 月号のコラム「キューに登録する WCF 応答サービスを構築する」(msdn.microsoft.com/magazine/cc163482) では、キューに登録された呼び出しの結果 (またはエラー) を受け取るための唯一の方法は、キューに登録された応答サービスを使用することだと説明しました。また、メッセージ ヘッダーに、論理メソッド ID と応答アドレスを含む応答コンテキスト オブジェクトを渡す方法を紹介しました。

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

バッファーを処理する場合、これと同じ設計パターンが当てはまります。クライアントは、サービスが応答をバッファーに格納するための、専用の応答バッファーを用意する必要があります。また、クライアントは、MSMQ ベースの呼び出しと同じように、応答アドレスとメソッド ID をメッセージ ヘッダーに渡す必要があります。MSMQ ベースの応答サービスと Service Bus の主な違いは、応答バッファーが Service Bus にも存在していなければならない点です (図 9 参照)。

Figure 9 Service Bus Buffered Response Service 図 9 Service Bus のバッファーを使用する応答サービス

クライアント側の処理効率を高めるために、ClientBufferResponseBase<T> クラスを記述しました。これは、以下のように定義しています。

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase<T> は、BufferedServiceBusClient<T> の特殊なサブクラスで、応答コンテキストをメッセージ ヘッダーに追加します。BufferedServiceBusClient<T> が、InterceptorClientBase<T> からだけでなく HeaderClientBase<T,H> から派生するようにしたのはこのためです。ClientBufferResponseBase<T> は、BufferedServiceBusClient と同じように使用できます (図 10 参照)。

図 10 クライアント側の効率を高める

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

ClientBufferResponseBase<T> のサブクラスを使用するのは簡単です。

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

これは、呼び出しを行うクライアントが、呼び出しのディスパッチに使用するメソッド ID を取得するように、クライアント側の応答を管理する場合に役に立ちます。これは、Header プロパティを使用して簡単に実行されます。

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

図 11 に、ClientBufferResponseBase<T> の実装を示します。ClientBufferResponseBase<T> は、HeaderClientBase<T,H> の PreInvoke メソッドをオーバーライドして、呼び出しごとに新しいメソッド ID を生成し、そのメソッド ID をヘッダーに設定します。

図 11 ClientBufferResponseBase<T> の実装

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

バッファーを使用するサービスが必要とする処理を効率化して応答サービスを呼び出すために、ServiceBufferResponseBase<T> クラスを記述しました (図 12 参照)。

図 12 ServiceBufferResponseBase<T> クラス

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

サービスではプレーンな BufferedServiceBusClient<T> を使用して応答をキューに登録することもできますが、ヘッダーから応答バッファーのアドレスを抽出し、なんらかの方法で資格情報を取得して、Service Bus のバッファーにログインすることが必要になります。また、送信呼び出しのヘッダーに、応答コンテキストを指定することも必要です。これらのすべての手順は、ServiceBufferResponseBase<T> を使用して効率化できます。ServiceBufferResponseBase<T> は、基本コンストラクターに、応答コンテキストからのアドレスを指定します。また、そのコンテキストを設定して、送信ヘッダーを作成します。

ServiceBufferResponseBase<T> で想定するもう 1 つの簡略化は、応答しているサービスが、そのホストが使用しているのと同じ資格情報を使用して (そのバッファーからメッセージを取得し)、応答バッファーにメッセージを送信できる点です。そのために、ServiceBufferResponseBase<T> は、そのホストへの参照を操作コンテキストから取得し、ホストの IServiceBusProperties の実装を使用して資格情報を読み取ります。ServiceBufferResponseBase<T> は、それらの資格情報をコピーして、独自に使用します (これは、BufferedServiceBusClient<T> 内で行われます)。もちろん、これにより、最初から BufferedServiceBusHost<T> を使用してサービスをホストすることが義務付けられます。サービスは、ServiceBufferResponseBase<T> からプロキシ クラスを派生し、これを使用して応答する必要があります。たとえば、以下のような応答コントラクトがあるとします。

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

図 13 に、クライアントに応答する、バッファーを使用する単純なサービスを示します。

図 13 ServiceBufferResponseBase<T> を使用する

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

すべての応答サービスでは、メッセージ ヘッダーからメソッド ID にアクセスする必要があります (下図参照)。

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

Service Bus の今後の調査を楽しみにしていてください。

Juval Lowy は、WCF のトレーニングとアーキテクチャ コンサルティングを行う IDesign 社に所属するソフトウェア アーキテクトです。このコラムでは、最近の著書『Programming WCF Services, Third Edition』(O'Reilly、2010 年、英語) を一部引用しています。彼は、シリコン バレー地域の Microsoft Regional Director も務めています。Lowy の連絡先は idesign.net (英語) です。

この記事のレビューに協力してくれた技術スタッフの Jeanne Baker に心より感謝いたします。