January 2016

Volume 31 Number 1

ビッグ データ - HDInsight を使用した .NET 開発者向けリアルタイム データ分析

Omid Afnan

あらゆる規模の企業が、大量に集めたデータの価値と、そのデータを活用する必要性に気付き始めています。通常、このような組織がビッグ データ活用を始める場合、自社で集めたビッグ データ資産をバッチ処理するところから着手します。つまり、Web ログ データ、アプリケーションでのユーザーのクリック数、モノのインターネット (IoT) デバイスからのテレメトリなど、人間や機械が集めた大量のデータを収集、集計することから始めます。昨年のコラム (msdn.com/magazine/dn890370) では、HDInsight で Hive を使用して、基本的な Web ログ分析を行う例を取り上げました。ところが、多くの組織は、履歴データから洞察を得るバッチ解析のメリットは活かせても、リアルタイム データの処理になると問題に直面することになり、連続データをリアルタイムに収集、分析、操作する方法には疑問を抱いています。

もちろん、ビッグ データの世界には、こうしたニーズに対応するテクノロジが存在します。Microsoft Azure プラットフォームには、Azure Data Lake や HDInsight といった、強力なビッグ データ ソリューションがあります。また、高度な分散リアルタイム分析が可能な Apache Storm というオープン ソース テクノロジもあります。このテクノロジは、Azure が管理する Apache Big Data サービスである HDInsight をネイティブにサポートします。今回は、ツイートのストリームを処理するという、単純ながらも強力なシナリオを取り上げ、Apache Storm を主要ツールとして使い、リアルタイムの連続分析を実現する方法を説明します。

後ほど説明しますが、この種の開発は、Visual Studio の強力なコード作成/デバッグ ツールを使用することで、他の製品を使用するよりも非常に簡単に行うことができます。HDInsight Tools for Visual Studio (Azure SDK の一部として提供) には、.NET 開発者が使い慣れたコーディングとデバッグの環境が用意されています。これらのツールを使用すると、現在オープン ソースとして提供されている単純なエディターやコマンドライン ツールを使用するよりも、はるかに容易にビッグ データ テクノロジに取り組むことができます。Storm for HDInsight は Java プログラミングを完全にサポートしますが、.NET プログラマ向けにビジネス ロジックの記述 (と再利用) に C# も使用できるようにしています。今回の例では、このような .NET 機能をデモします。

感情追跡シナリオ

新しい傾向を追跡して分析するシナリオは昔からあります。ニュース報道、天候追跡、災害検知などは、クラウド コンピューティングが登場する以前から行われています。ところが、傾向の検出が求められる分野の範囲や、分析に利用できるデータの規模はいずれも、クラウド時代が進むにつれて、想像を絶するほど拡大しています。感情分析については、ソーシャル ネットワーキングが格好の材料になります。Twitter など、API を用意してソーシャル データにアクセスできるようにしているサービスと、HDInsight などの従量課金のビッグ データ プラットフォームを組み合わせれば、規模の大小にかかわらず、あらゆる組織が感情分析に取り組めるようになります。

Twitter を使用した最も単純な感情分析は、ある期間を定めて、特定のトピックやハッシュタグについて、ユーザーがつぶやく回数をカウントすることです。もちろん、1 つの期間 (たとえば 1 分間) だけ分析するのは、1 日中毎分分析して回数の上下を調査するのに比べたら面白味がありません。ある言葉の使用頻度の急上昇を見つけたら、傾向の検出につながるかもしれません。たとえば、台風や地震に関連した言葉を検出したら、自然災害の影響を受けている地域や、災害の過酷さに関する判断材料を、非常に迅速に得られる可能性があります。

こうした分析の基本をデモするために、Twitter からのデータ取得、ツイートの選択、メトリックの計算、ストレージへの保存、結果の公開を行うストリーミング トポロジをセットアップする方法を順を追って紹介します。このトポロジを 図 1 に示します。今回は、単純なキーワード マッチングを使用してツイートを選択しました。計算するメトリックは、選択条件に一致するツイート数です。選択したツイートは SQL データベースに格納します。また、Web サイトにも公開します。これらはすべて、Azure クラウドで現在利用できる、Storm、SQL Server、および Web サイト サービスを使用して実行します。例を紹介した後、この種のストリーミング データ分析の問題を解決するのに利用できる他のテクノロジについてもいくつか取り上げます。

感情分析トポロジ
図 1 感情分析トポロジ

Storm の基本

Storm は、オープン ソースの Apache プロジェクト (storm.apache.org、英語) で、データ ストリームに対するリアルタイム分散コンピューティングを可能にします。これは、ビッグ データ処理ツールの Hadoop エコシステムの一部で、HDInsight で直接サポートされています。Storm のジョブは、タプル形式のデータ ストリームによって接続される処理ノードのグラフとして定義されます。Storm では、このようなグラフを「トポロジ」と呼びます。トポロジは、他のクエリのように終了することはありません。トポロジは、中断されるか、強制終了されるまで実行を継続します。

Azure の管理ポータルで、新しい HDInsight クラスターを作成することができます。このとき、クラスターの種類として Storm を選択します。これで、Azure は、必要な OS、Hadoop、および Storm のコンポーネントをすべて事前に読み込んだコンピューターのクラスターを数分のうちにセットアップします。必要なノード数の選択、さまざまなコアとメモリ サイズの指定、ノード数のスケーリングは、いつでも行うことができます。Hadoop エクスペリエンスを簡素化するという観点からすると、既に、複数のコンピューターの取得と構成に関する時間や作業が大幅に削減されていることになります。

トポロジのコンポーネントを、スパウトとボルトと呼びます。スパウトは、タプルのストリーム (基本的には型と値のペアのセット) を生成します。つまり、スパウトとは一連のコードで、データを収集または生成してから、そのデータをブロックとして出力します。ボルトとは、データ ストリームを利用する側のコード ユニットです。ボルトには、データを処理してクリーンアップするものや、統計を計算するものもあります。場合によっては、ダウンストリームのボルトに対してタプルの別のストリームを出力するボルトもあります。また、データをストレージや他のシステムに書き込むボルトもあります。

こうしたコンポーネントはそれぞれ、多くの並列タスクを実行することができます。これが、Storm のスケーラビリティと信頼性の鍵となっています。開発者は各コンポーネントの並列度合いを指定でき、指定した度合いに応じた数のタスクが Storm によって割り当てられ、スパウトやボルトでロジックが実行されます。Storm は、タスクを管理し、失敗したタスクを自動再開することで、フォールト トレランスを実現します。最後に、特定のトポロジが、一連のワーカー プロセス (実際には実行コンテナー) で実行されます。ワーカーを追加して、トポロジの処理能力を上げることができます。これらの機能によって、Storm のスケーラビリティとフォールト トレランスの実現に不可欠な特性が提供されます。

トポロジは、リアルタイム分析シナリオ全体に必要な処理を実行する必要性に応じて、複雑になっていく可能性があります。アーキテクチャは、コンポーネントの再利用に適しています。しかし、スパウトとボルトの数が増えると、このアーキテクチャが管理と展開に関する難しい問題の原因にもなります。Visual Studio プロジェクトの考え方は、トポロジのインスタンス作成に必要なコードと構成コンポーネントの管理に便利な方法を提供することです。トポロジという考え自体が本質的にグラフィカルなものなので、システムの開発と運用の両面で、トポロジを視覚化できると非常に役立つことがわかります。このことは、図 2 に示す HDInsight tools for Visual Studio の実行ビューで確認できます。

アクティブな Storm トポロジの監視ビュー
図 2 アクティブな Storm トポロジの監視ビュー

Storm のアーキテクチャは Apache Thrift に基づいています。Apache Thrift とは、複数の言語で実装されるサービスの開発を可能にするフレームワークです。多くの開発者が Java を使用してスパウトとボルトを記述していますが、Java の使用は必須ではありません。ライブラリの SCP.Net パッケージを導入すれば、C# を使用してスパウトとボルトを開発できるようになります。このパッケージは HDInsight Tools for Visual Studio のダウンロードに含まれていますが、NuGet を通じてダウンロードすることもできます。

ほぼリアルタイムでのツイートのフィルター処理

それでは、ツイート ストリームのフィルター処理トポロジの作成方法を紹介しながら、各コンポーネントで実際にどのような処理を行うかを見ていきましょう。今回のサンプル トポロジは、1 つのスパウトと 3 つのボルトから構成されます。このトポロジのグラフィカル ビューは、図 2 の HDInsight Tools for Visual Studio の画像に示すとおりです。Azure で実行するために Storm プロジェクトを送信すると、Visual Studio にこのグラフィカル ビューが表示されます。このビューは、時間の経過と共に、システムを流れるイベントの数と、ノードで発生したエラー状態が更新されます。

ここでは、TwitterSpout が、処理対象のツイートのストリームを取得する役割を果たします。TwitterSpout では、Twitter API と通信してツイートを収集した後、ツイートをデータのタプルに変換して、トポロジの残りの部分にストリーミングできるようにします。TwitterBolt はストリームを取り出し、ツイート数を数え、他のデータ ソースから取得したデータとの結合などの集計を行います。このボルトは、実行するビジネス ロジックに基づいて、(場合によっては新しい形式で) 新しいストリームを出力します。AzureSQLBolt コンポーネントと SignalRBroadcastBolt コンポーネントはそれぞれ、このストリームを利用して、Azure がホストする SQLServer データベースと SignalR Web サイトにデータの一部を書き込みます。

今回は C# を使用して Storm ソリューションを作成しているので、開発の単純化と時間短縮に、さまざまな既存のライブラリを利用できます。この例で重要な 2 つのパッケージは、CodePlex の Tweetinvi ライブラリ (bit.ly/1kI9sqV、英語) と NuGet の SCP.Net ライブラリ (bit.ly/1QwICPj、英語) です。

SCP.Net フレームワークを使用すると、Storm プログラミング モデルを扱う際の複雑さがかなり軽減されます。また、SCP.Net には、手動で行う必要がある作業の多くをカプセル化できる基本クラスが用意されています。最初の作業は、Microsoft.SCP.ISCPSpout 基本クラスからの継承です。これにより、スパウトに必要な 3 つの主要メソッド NextTuple、Ack、および Fail を利用できるようになります。NextTuple は、ストリームで次に利用できるデータを出力するか、何も出力しません。このメソッドは、緊密なループの中で Storm から呼び出されます。出力するタプルがない場合は、いくらかのスリープ時間を組み込むようにします。これは、トポロジを連続実行するときに、CPU サイクルを 100% 使用しないようにするための方法の 1 つです。

タプルの "At-Least-Once" (最低 1 回) セマンティクスのような、確実なメッセージ処理を実装する必要がある場合は、Ack メソッドと Fail メソッドを使用して、ボルト間に必要なハンドシェイクを実装します。今回の例では再試行メカニズムを使用しないため、TwitterSpout クラスのプライベート キュー メンバーからツイートを取得しトポロジに出力するコードを使用して、NextTuple メソッドのみを実装しています。

トポロジ内のストリームは、スパウトまたはボルトによって公開されるスキーマとしてキャプチャされます。これらは、トポロジ内のコンポーネント間のコントラクトとして使用します。また、データの送信時には、SCP.Net で使用するシリアル化/シリアル化解除ルールとしても使用します。Context クラスは、スパウトまたはボルトのインスタンスごとの構成情報を格納するために使用します。スパウトから出力するタプルのスキーマを Context に格納しています。このスキーマは、SCP.Net によって使用され、コンポーネントの接続が構築されます。

TwitterSpout クラスの初期化コードを見てみましょう。そのコードの一部を図 3 に示します。

図 3 TwitterSpout クラスの初期化

public TwitterSpout(Context context)
{
  this.context = context;
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  this.context.DeclareComponentSchema(new ComponentStreamSchema(
    null, outputSchema));
  // Specify your Twitter credentials
  TwitterCredentials.SetCredentials(
    ConfigurationManager.AppSettings["TwitterAccessToken"],
    ConfigurationManager.AppSettings["TwitterAccessTokenSecret"],
    ConfigurationManager.AppSettings["TwitterConsumerKey"],
    ConfigurationManager.AppSettings["TwitterConsumerSecret"]);
  // Setup a Twitter Stream
  var stream = Tweetinvi.Stream.CreateFilteredStream();
  stream.MatchingTweetReceived += (sender, args) => { NextTweet(args.Tweet); };
  // Setup your filter criteria
  stream.AddTrack("China");
  stream.StartStreamMatchingAnyConditionAsync();
}

図 3 は、トポロジの開始時に渡されたコンテキストを使用して、スパウトのコンテキストを初期化するコードです。このコンテキストはその後、スキーマ定義を追加して更新します。Dictionary オブジェクトを作成し、ストリーム (DEFAULT_STREAM) の型 ID と、タプルのすべてのフィールドの型リスト (この場合は SerializableTweet) を追加します。これで、コンテキストには、このクラスのタプルを出力したり、TwitterBolt で利用するときに従う必要のあるスキーマ定義が含まれるようになります。

このスニペットの残りの部分は、Twitter ストリームのセットアップです。Tweetinvi パッケージによって、Twitter の REST とストリーミング API の両方の抽象化が提供されます。適切な資格情報をエンコードしたら、使用するソースの種類のインスタンスを作成します。ソースをストリーミングする場合、フィルター処理されたストリーム、サンプリングされたストリーム、およびユーザー ストリームの 3 つの種類から 1 つ選択できます。これらは、すべてのツイートのキーワード フィルター処理、公開されているツイートのランダムなサンプリング、および特定のユーザーに関連したイベントの追跡を行うための、簡素化されたインターフェイスを提供します。ここでは、フィルター処理されたストリームを使用します。このストリームを使用すると、公開されているすべてのツイートから、複数のキーワードのセットのうち 1 つでも使われているツイートをチェックして、選び出すことができます。

Tweetinvi API を使用すると処理が容易になるため、ここで、必要なツイートのフィルター処理をスパウトで行います。TwitterBolt コンポーネントのフィルター処理や、ツイートを操作するために必要なその他の計算や集計も一緒に行うことができます。スパウトでフィルター処理を行うと、トポロジにストリーミングするデータの量を早い段階で減らすことができます。ただし、Storm の威力は、スケールアウトすることで、トポロジ内のどのコンポーネントでも大量のデータを処理できることにあります。Storm は、リソースを追加することで、ほぼ線形のスケーリングを実現します。そのため、ボトルネックが発生したときには、使用するワーカーを増やしてスケールを拡大します。HDInsight では、セットアップ時にクラスターのサイズとノードの種類を選択でき、また、後からノードを追加できるようにすることで、このアプローチをサポートしています。スケールアウト アプローチを使用して、毎秒何百万ものイベントを処理できる Storm クラスターを構築できます。課金は、クラスターで実行しているノード数に応じて行われるため、コストとスケールのトレードオフを意識する必要があります。

図 3 で最後に説明する部分は、条件に一致するツイートを検出したときに呼び出す Tweetinvi ストリーム オブジェクトに対するコールバックの登録です。NextTweet メソッドがそのコールバックです。このメソッドは、提供されたツイートを、前述の TwitterSpout クラスのプライベート キューに追加します。

public void NextTweet(ITweet tweet)
{
  queue.Enqueue(new SerializableTweet(tweet));
}

トポロジのボルトのコーディングも似ています。ボルトを Microsoft.SCP.ISCPBolt クラスから派生し、Execute メソッドを実装する必要があります。ここで、タプルを SCPTuple というジェネリック型として渡します。これは、まず適切な型に変換する必要があります。その後、C# コードを記述して、必要な細部の処理を実行します。この場合、グローバル変数を使用して、ボルトが認識するタプル数のカウントを累積し、カウントとツイート テキストをログに記録します。最後に、新しい型のタプルをダウンストリームのボルトに出力して、利用できるようにします。このコードを次に示します。

public void Execute(SCPTuple tuple)
{
  var tweet = tuple.GetValue(0) as SerializableTweet;
  count++;
  Context.Logger.Info("ExecuteTweet: Count = {0}, Tweet = {1}", count, tweet.Text);
  this.context.Emit(new Values(count, tweet.Text));
}

ボルトの場合、入力スキーマも出力スキーマもセットアップ時に指定しなければなりません。形式は、前に説明したスパウトのスキーマ定義とまったく同じです。outputSchema という Dictionary 変数を定義し、整数型と文字列型の出力フィールドを指定します (図 4 参照)。

図 4 TwitterBolt の入力スキーマと出力スキーマの指定

public TwitterBolt(Context context, Dictionary<string, Object> parms)
{
  this.context = context;
  Dictionary<string, List<Type>> inputSchema =
    new Dictionary<string, List<Type>>();
  inputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(long), typeof(string) });
  this.context.DeclareComponentSchema(
    new ComponentStreamSchema(inputSchema,
    outputSchema));
}

他のボルトも同じパターンに従いますが、SQL Azure と SignalR 用に特定の API を呼び出します。最後の重要な要素は、コンポーネントと接続を列挙して、トポロジを定義することです。これを行うには、すべてのスパウトとボルトにもう 1 つメソッドを実装する必要があります。それは Get メソッドです。この Get メソッドは、Context 変数を指定して、Storm タスクの起動中に SCPContext から呼び出される、このクラスのオブジェクトのインスタンスを作成します。SCP.Net が C# の子プロセスのインスタンスを作成することになります。この子プロセスは、以下のデリゲート メソッドを使用して、C# のスパウトまたはボルトのタスクを起動します。

return new TwitterSpout(context);

スパウトとボルトを用意したら、今度はトポロジを作成します。ここでも、トポロジを作成するためのクラスとヘルパー関数が SCP.Net によって提供されます。Microsoft.SCP.Topology.Topology­Descriptor から派生したクラスを作成し、GetTopologyBuilder メソッドをオーバーライドします。このメソッドでは、TopologyBuilder 型のオブジェクトを使用します。この型には SetSpout メソッドと SetBolt メソッドが用意されています。この 2 つのメソッドが、対応するコンポーネントの名前と、入力/出力スキーマを指定できるようにします。また、Get デリゲートを使用してコンポーネントを初期化するよう指定することや、最も重要な、現在のコンポーネントに接続するアップストリーム コンポーネントを指定することもできます。図 5 に、トポロジを定義するコードを示します。

図 5 Twitter 分析トポロジの作成

namespace TwitterStream
{
  [Active(true)]
  class TwitterTopology : TopologyDescriptor
  {
    public ITopologyBuilder GetTopologyBuilder()
    {
      TopologyBuilder topologyBuilder = new TopologyBuilder(
        typeof(TwitterTopology).Name + DateTime.Now.ToString("-yyyyMMddHHmmss"));
      topologyBuilder.SetSpout(
        typeof(TwitterSpout).Name,
        TwitterSpout.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"tweet"}}
        },
        1);
      topologyBuilder.SetBolt(
        typeof(TwitterBolt).Name,
        TwitterBolt.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"count", "tweet"}}
        },
        1).shuffleGrouping(typeof(TwitterSpout).Name);
      topologyBuilder.SetBolt(
        typeof(SqlAzureBolt).Name,
        SqlAzureBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      topologyBuilder.SetBolt(
        typeof(SignalRBroadcastBolt).Name,
        SignalRBroadcastBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      return topologyBuilder;
    }
  }
}

完全な Twitter 分析プロジェクトは、Visual Studio でプロジェクトの種類に Storm を選択してビルドできます。このプロジェクトを使用すると、必要なさまざまなコンポーネントを単純かつなじみのある方法で簡単にレイアウトして、ソリューション エクスプローラーで表示できます (図 6 参照)。ボルトやスパウトなどのコンポーネントは、プロジェクトのコンテキスト メニューの [追加]、[新しい項目] を使用して追加できます。項目の種類で Storm を選択すると、新しいファイルが追加され、必要なメソッドすべての概要が含まれます。Visual Studio Storm プロジェクトを使用すると、Tweetinvi などのライブラリへの参照を、直接または NuGet を通じて追加できます。Azure で実行するためにトポロジを送信するには、ソリューション エクスプローラーのコンテキスト メニューを 1 回クリックするだけです。必要なコンポーネントすべてが、選択した HDInsight Storm クラスターにアップロードされ、トポロジが送信されます。

ソリューション エクスプローラーからトポロジを送信
図 6 ソリューション エクスプローラーからトポロジを送信

送信すると、図 2 のトポロジ ビューが表示され、トポロジの状態を監視できます。Storm では、アクティブ、非アクティブ、強制終了など、いくつかのトポロジ状態を許可しており、スケーラビリティのパラメーターに応じてワーカーのタスクを再調整できるようになっています。Visual Studio では、これらの状態遷移をすべて管理できるうえ、現在のタプルのフローの監視もできます。コンポーネントを詳細に調査して問題をデバッグするために、エラー状態 (トポロジ ビューの赤い外枠とマーカー) を示している SqlAzureBolt などの個別のコンポーネントまでドリルダウンできます。このボルトをダブルクリックすると、タプルのフローに関する詳細な統計とボルトのエラーの概要が表示されます。エラー ポート リンクをクリックすると、Visual Studio を離れることなく個別のタスクの完全なログに移動することもできます。

今回取り上げている単純なトポロジのコードとプロジェクトは、GitHub の MicrosoftBigData リポジトリで見つけることができます。TwitterStream フォルダーと TwitterStream サンプル プロジェクトを探してください。azure.microsoft.com/ja-jp/documentation/articles/hdinsight-storm-overview/ で追加の記事やサンプルを参照できます。

より複雑な分析への移行

紹介した Storm トポロジの例は、単純なものです。Storm のリアルタイム処理能力を高め、複雑にしていく方法はたくさんあります。

既に述べたように、HDInsight の Storm クラスターに割り当てるリソースの数は、必要に応じてスケールアップできます。図 2 のトポロジについて、Visual Studio の実行時ビューで提供されるデータからシステムのパフォーマンスを把握できます。ここで、出力するタプルの数や、実行基、タスク、遅延などの数を確認できます。図 7 は、Azure の管理ポータル画面です。ここで、現在使用されているノードの数、型とコアのカウントについてさらに詳細に確認できます。この情報に基づいて、クラスターのスケーリングや、スーパーバイザー (ワーカー) ノードをクラスターに追加する判断が可能です。このスケールアップには再起動の必要がなく、Visual Studio のトポロジ ビューまたは管理ポータルで再調整をトリガーすると数分で実行されます。

Storm クラスターの Azure 管理ポータル画面
図 7 Storm クラスターの Azure 管理ポータル画面

ほとんどの分析アプリケーションでは、複数の構造化されていないビッグ データ ストリームを操作します。今回の場合、トポロジには、複数のスパウトと、複数のスパウトからの読み取りが可能な複数のボルトが含まれています。これは、SetBolt メソッドの呼び出しにいくつかの入力を指定することで、簡単にトポロジ構成内で表現できます。しかし、同一のボルトで複数のソースを扱うビジネス ロジックは、個別のタプルがさまざまなストリーム ID で到着するため、より複雑になります。ビジネス問題の複雑さが増すと、処理中にリレーショナル データ ソースや構造化されたデータ ソースも必要になる可能性があります。スパウトはキューのようなデータ ソースに適していますが、リレーショナル データはボルトで処理する方が適切です。ここでも、柔軟性の高いボルトを実装し、C# や Java を使用することで、定評のある API やクエリ言語を使ってデータベースへのアクセスを簡単にコーディングできます。コーディングが複雑になる原因は、この呼び出しが、クラスターの Storm コンテナーからデータベース サーバーにリモートで行われることにあります。SQL Azure と HDInsight は同じ Azure ファブリックで動作し、簡単にやり取りできますが、クラウド ベース サービスという別の選択肢も利用できます。

Storm ランタイムを使用すると、システムのさまざまな細かい動作の設定や調整が可能です。そのような設定の多くは、トポロジやタスク レベルで適用できる構成パラメーターとして表示されます。構成パラメーターには Microsoft.SCP.Topology.StormConfig クラスでアクセスでき、Storm ワークロード全体の調整に使用できます。例としては、スパウトごとの保留タプルの最大数、tick タプル、スパウト スリープ戦略の設定などがあります。トポロジに対する他の変更は、トポロジ ビルダーを使って行うことができます。今回の例のトポロジでは、すべてのコンポーネント間のストリーミングが "シャッフル グルーピング" に設定されています。どのようなコンポーネントでも、Storm の実行システムでは、多数の個別のタスクを作成できますし、実際に作成されます。これらのタスクは、コアやコンテナー間で並列に実行して、複数のリソースのボルトにワークロードを分散できる独立したワーカー スレッドです。開発者は、ボルトから次のボルトにタスクを渡す方法を制御できます。シャッフル グルーピングの選択は、どのタプルを次のボルトのどのワーカー プロセスに渡してもかまわないことを意味します。タプルの特定のフィールド値に基づいてタプルを同じワーカーに送信する "フィールド グルーピング" など、他のオプションを選択することもできます。このオプションは、ツイート ストリームに含まれる特定の言葉の実行カウントなど、状態を持つ操作のデータのフロー制御に使用できます。

最後に、リアルタイム分析システムは、組織内で規模のより大きな分析パイプラインに組み込むことができます。たとえば、Web ログ分析システムには、毎日のように Web サービスのログの処理を行う、大きなバッチ指向の部分が存在する可能性があります。この部分は、Web サイト トラフィックの概要を生成し、パターン検出に適した、軽く集計されたデータをデータ科学者に提供します。この分析に基づいて、システムのエラーや悪意のある使用の検出など、特定の動作に対するリアルタイムのトリガーを作成することに決めるチームもあります。悪意のある使用に関しては、ログや製品利用統計情報のリアルタイム分析が必要ですが、バッチ システムによって毎日更新される参照データを利用する可能性が高いでしょう。このような大規模なパイプラインには、さまざまな計算モデルやテクノロジのタスクを同期できるワークフロー管理ツールが必要です。Azure Data Factory (ADF) には、Azure の分析やストレージ サービスをネイティブにサポートし、入力データの可用性に応じてタスクを調整できるワークフロー管理システムが用意されています。ADF では、HDInsight と Azure Data Lake Analytics をサポートしているほか、Azure Storage、Azure Data Lake Store、Azure SQL データベース、およびオンプレミス データ ソース間のデータの移行もサポートしています。

その他のストリーミング テクノロジ

今回は、HDInsight で Storm を使用したリアルタイム ストリーミング分析の基本を紹介しました。もちろん、Storm は独自のデータセンターやラボの、独自のコンピューター クラスター上にセットアップすることもできます。Storm のディストリビューションは、Hortonworks、Cloudera、または直接 Apache から入手できます。そのような場合のインストールと構成は、非常に時間がかかりますが、概念とコード成果物は同じです。

Spark (spark.apache.org、英語) は、リアルタイム分析に使用でき、大きな人気を得ているもう 1 つの Apache プロジェクトです。一般的なビッグ データ処理が可能ですが、メモリ内処理とストリーミング関数ライブラリがサポートされているため、パフォーマンスの高いリアルタイム処理のための興味深い選択肢となっています。HDInsight には、クラスターの種類として Spark が用意されています。これを使ってこのテクノロジを試すことができます。このサービスには、Zeppelin と Jupyter のノートブックが含まれています。ノートブックは、これらの言語でクエリを作成して、対話的に結果を表示できるインターフェイスです。ノートブックは、ビッグ データ セットに対するデータの調査やクエリの開発に最適です。

だんだん複雑になるビッグ データ分析のシナリオを組織が順に処理していくことで、リアルタイム ストリーミング分析への関心が培われています。同時に、この分野のテクノロジが継続して成長、成熟し、ビッグ データから洞察を得る新しいチャンスを生み出しています。これらのテーマについては、Spark や Azure Data Lake Analytics などのテクノロジの使用方法に関する今後のコラムをお楽しみにお待ちください。


Omid Afnan は、分散演算システムとそれに関連する開発ツールチェーンの実装に取り組んでいる、Azure ビッグ データ チームの主任プログラム マネージャーで、中国に在住および勤務しています。連絡先は omafnan@microsoft.com (英語のみ) です。

この記事のレビューに協力してくれた技術スタッフの Asad Khan と Ravi Tandon に心より感謝いたします。
Ravi Tandon (マイクロソフト)、Asad Khan (マイクロソフト)

Asad Khan は、Azure HDInsight Service を通じて、Hadoop で実現されるクラウド エクスペリエンスに取り組んでいる、マイクロソフト ビッグ データ グループの主任プログラム マネージャーです。現在は Spark と Apache Storm によるリアルタイム分析に重点を置いています。ここ数年は、Hadoop、OData、ビッグ データに対する BI などの、マイクロソフトの次世代データ アクセス テクノロジに取り組んでいます。Asad はスタンフォード大学の博士号を取得しています。

Ravi Tandon は、Microsoft Azure HDInsight チームのシニア ソフトウェア エンジニアです。彼は、Microsoft Azure HDInsight の Apache Storm と Apache Kafka サービスを担当しています。