December 2015

Volume 30 Number 13

テストの実行 - .NET 開発者向け Spark 入門

James McCaffrey | December 2015

James McCaffreySpark は、ビッグ データ向けオープン ソース コンピューティング フレームワークで、特に機械学習のシナリオに利用される場面が増えています。今回は、Windows OS を実行するコンピューターに Spark をインストールする方法と、.NET 開発者の立場から見た Spark の機能を解説します。

今回の目標を確認するには、図 1 に示す対話型セッションのデモを見るのが一番です。管理者モードで実行した Windows コマンド シェルから、spark-shell コマンドを実行して、Spark 環境を開始します。

実行中の Spark
図 1 実行中の Spark

spark-shell コマンドにより、シェルで実行する Scala インタープリターが生成され、Scala プロンプト (scala>) が表示されます。Scala は、Java を基にしたスクリプト言語です。Spark を操作する方法はほかにもありますが、Spark フレームワークがほぼ Scala で記述されていることから、Scala インタープリターを使用するのが最も一般的なアプローチです。Python 言語のコマンドを使用するか、Java プログラムを記述して、Spark を操作することも可能です。

図 1 に、いくつか警告メッセージが表示されているのがわかります。Spark にはオプションのコンポーネントが多数あり、見つからない場合に警告メッセージが表示されるため、Spark を実行する際には、このようなメッセージがよく表示されます。シンプルなシナリオでは、一般に警告メッセージを無視してもかまいません。

デモ セッションで最初に入力しているのは、以下のコマンドです。

scala> val f = sc.textFile("README.md")

このコマンドは、大まかには「f という変更不可の RDD オブジェクトに、README.md ファイルのコンテンツを格納しなさい」と言っています。Scala オブジェクトは「val」または「var」で宣言します。「val」で宣言したオブジェクトは変更できません。

Scala インタープリターには、Spark 機能へのアクセスに使用する、sc という組み込みの Spark コンテキスト オブジェクトがあります。textFile 関数は、RDD (Resilient Distributed Dataset) という Spark データ構造にテキスト ファイルのコンテンツを読み込みます。RDD は、Spark で使用するプログラミングの主要抽象表現です。RDD は、複数のコンピューターにまたがる RAM に格納される .NET コレクションのようなものと考えてかまいません。

README.md テキスト ファイル (.md 拡張子はマークダウン ドキュメントの略称) は、Spark のルート ディレクトリ C:\spark_1_4_1 にあります。ターゲット ファイルが別の場所にある場合は、C:\\Data\\ReadMeToo.txt のようにフルパスを指定します。

デモ セッションの 2 つ目のコマンドは以下のとおりです。

scala> val ff = f.filter(line => line.contains("Spark"))

これは、「ff という変更不可の RDD オブジェクトに、オブジェクト f の中で単語 "Spark" を含む行だけを格納しなさい」と言っています。filter 関数は、いわゆるクロージャを受け取ります。クロージャは、匿名関数のようなものと考えてかまいません。このクロージャは、line というダミー文字列入力パラメーターを受け取り、line に "Spark" が含まれている場合は true を、それ以外の場合は false を返します。

"line" はパラメーター名にすぎないので、以下に示すように、クロージャで別の名前を使用してもかまいません。

ln => ln.contains("Spark")

Spark では大文字と小文字が区別されるため、以下のコードはエラーになります。

ln => ln.Contains("Spark")

Scala には、関数型プログラミング言語の特徴がいくつかあるため、複数のコマンドを組み合わせることもできます。たとえば、最初の 2 つのコマンドを組み合わせて、1 つのコマンドにすることができます。

val ff = sc.textFile("README.md").filter(line => lne.contains("Spark"))

デモ セッション最後の 3 つのコマンドは以下のとおりです。

scala> val ct = ff.count()
scala> println(ct)
scala> :q

count 関数は RDD の項目数を返します。今回は、README.md ファイル内にある "Spark" という単語を含む行数を返します。このような行は 19 行あります。Spark Scala セッションを終了するには、「:q」コマンドを入力します。

Windows コンピューターでの Spark のインストール

Windows コンピューターに Spark をインストールするには、大きく 4 つの手順があります。最初に、Java 開発キット (JDK) と Java ランタイム環境 (JRE) をインストールします。次に、Scala 言語をインストールします。続いて、Spark フレームワークをインストールします。最後に、ホスト コンピューターのシステム変数を構成します。

Spark の配布物は、.tar 圧縮形式で提供されるため、Spark のファイルを展開するユーティリティが必要です。事前に、オープン ソースの 7-Zip プログラムをインストールすることをお勧めします。

Spark と Spark のコンポーネントは、Windows OS の広範なバージョンで正式にサポートされていませんが、Windows 7、Windows 8、Windows 10、Windows Server 2008、Windows Server 2012 を実行する各コンピューターに Spark をインストールすることができました。図 1 に示すデモは、Windows 8.1 コンピューターで実行したものです。

JDK は、自己解凍実行可能ファイルを実行してインストールします。この自己解凍ファイルはインターネットで検索すれば見つかります。今回は jdk-8u60-windows-x64.exe バージョンを使用しています。

64 ビット バージョンの JDK をインストールする場合、既定のインストール ディレクトリは C:\Program Files\Java\jdkx.x.x_xx\ です (図 2 参照)。この既定のインストール ディレクトリは変更しないことをお勧めします。

JDK の既定の場所
図 2 JDK の既定の場所

JDK をインストールすると、関連する JRE もインストールされます。インストールが完了すると、Java 既定の親ディレクトリには、JDK ディレクトリと関連 JRE ディレクトリの両方が含まれています (図 3 参照)。

C:\Program Files\Java\ にインストールされた Java JDK と JRE
図 3 C:\Program Files\Java\ にインストールされた Java JDK と JRE

コンピューターによっては、C:\Program Files (x86) ディレクトリに、1 つ以上の 32 ビット JRE ディレクトリを持つ Java ディレクトリが 1 つ含まれる場合もあります。コンピューターに、32 ビット バージョンの JRE と 64 ビット バージョンの JRE が両方あっても問題ありませんが、64 ビット バージョンの Java JDK のみ使用することをお勧めします。

Scala のインストール

次の手順では Scala 言語をインストールしますが、インストールを実行する前に、Spark ダウンロード サイト (詳細後述) に移動して、どのバージョンの Scala をインストールするかを決める必要があります。Scala のバージョンは、次の手順でインストールする Spark バージョンとの互換性を確保することが求められます。

残念ながら、Scala と Spark のバージョン互換性に関する情報はあまり提供されていません。Spark コンポーネントをインストールした時点 (本稿公開のかなり前) では、Spark の最新バージョンは 1.5.0 でしたが、どのバージョンの Scala がバージョン 1.5.0 の Spark と互換性があるかを示す情報は見つかりませんでした。そこで、1.4.1 という以前のバージョンの Spark を確認したところ、開発者向けのディスカッション Web サイトで、バージョン 2.10.4 の Scala がバージョン 1.4.1 の Spark と互換性があることを示す情報が見つかりました。

Scala のインストールは簡単です。インストール プロセスに必要なのは、.msi インストール ファイルだけです。

Scala インストール ウィザードに従い、インストール プロセスを実行します。興味深いことに、Scala の既定のインストール ディレクトリは、64 ビットのディレクトリ C:\Program Files\ ではなく、32 ビットのディレクトリ C:\Program Files (x86) です (図 4 参照)。

C:\Program Files (x86)\scala\ にインストールされる Scala
図 4 C:\Program Files (x86)\scala\ にインストールされる Scala

Scala コマンドを使用するのではなく、Java プログラムを記述して Spark を操作する場合は、Scala Simple Build Tool (SBT) という別のツールをインストールする必要があります。コンパイル済みの Java プログラムを使用して Spark を操作するのは、対話型の Scala を使用するよりもはるかに難しくなります。

Spark のインストール

次は、Spark フレームワークをインストールしますが、まず、.tar 形式のファイルから展開できる 7-Zip などのユーティリティ プログラムを用意します。Spark のインストール プロセスは手作業です。ローカル コンピューターに圧縮済みのフォルダーをダウンロードして、圧縮ファイルを展開後、ルート ディレクトリに展開したファイルをコピーします。つまり、Spark をアンインストールする場合は、Spark のファイルを削除するだけです。

Spark のサイトは、spark.apache.org です。ダウンロード ページでは、バージョンとパッケージの種類を選択できます。Spark はコンピューティング フレームワークであり、分散ファイル システム (DFS) が必要です。Spark フレームワークと最もよく一緒に使用される DFS は、Hadoop 分散ファイル システム (HDFS) です。図 1 に示すデモ セッションなど、テストや実験を目的とする場合は、DFS のないシステムに Spark をインストールしてもかまいません。今回のシナリオでは、Spark はローカル ファイル システムを使用します。

.tar ファイルを展開した経験が少ないと、展開の実行を 2 回要求されることに少し戸惑うかもしれません。最初に、任意の一時ディレクトリ (今回は C:\Temp) に、.tar ファイル (今回は spark-1.4.1-bin-hadoop2.6.tar ファイル) をダウンロードします。次に、.tar ファイルを右クリックし、コンテキスト メニューの [展開] を選択して、一時ディレクトリ内の新しいディレクトリに展開します。

最初の展開プロセスでは、拡張子なしの新しい圧縮ファイル (今回は spark-1.4.1-bin-hadoop2.6) が作成されます。次に、この拡張子なしの新しい圧縮ファイルを右クリックし、コンテキスト メニューの [展開] をもう一度クリックして、別のディレクトリに展開します。この 2 回目の展開で、Spark フレームワークのファイルが作成されます。

Spark フレームワークのファイル用にディレクトリを作成します。共通表記規則では、C:\spark_x_x_x というディレクトリを作成します (x の値はバージョン)。この規則に従って、C:\spark_1_4_1 ディレクトリを作成し、このディレクトリに展開したファイルをコピーします (図 5 参照)。

展開した Spark ファイルを手動で C:\spark_x_x_x\ にコピー
図 5 展開した Spark ファイルを手動で C:\spark_x_x_x\ にコピー

コンピューターの構成

Java、Scala、および Spark をインストールしたら、最後にホスト コンピューターを構成します。そのためには、Windows 向けに必要な特殊なユーティリティ ファイルをダウンロードし、ユーザー定義のシステム環境変数を 3 つ設定して、システム Path 変数を設定し、オプションで Spark の構成ファイルを変更します。

Windows で Spark を実行するには、C:\hadoop というローカル ディレクトリに、winutils.exe という特殊なユーティリティ ファイルが存在する必要があります。このファイルは、インターネットで検索すればさまざまな場所で見つかります。C:\hadoop ディレクトリを作成し、winutils.exe (http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe) を実行して、C:\hadoop ディレクトリにファイルをダウンロードします。

次に、ユーザー定義のシステム環境変数を 3 つ作成して設定し、システム Path 変数を変更します。コントロール パネルに移動して、[システムとセキュリティ]、[システム]、[システムの詳細設定]、[詳細設定]、[環境変数] の順にクリックします。[ユーザー環境変数] セクションで、変数名と値を以下のように設定して 3 つの新しい変数を作成します。

JAVA_HOME     C:\Program Files\Java\jdk1.8.0_60
SCALA_HOME    C:\Program Files (x86)\scala
HADOOP_HOME   C:\hadoop

次に、[システム環境変数] で Path 変数を編集して、Spark のバイナリの場所 C:\spark_1_4_1\bin を追加します。Path 変数のいずれの値も削除しないように注意してください。Scala のバイナリの場所は、Scala のインストール プロセスによって自動的に追加されます (図 6 参照)。

システムの構成
図 6 システムの構成

システム変数を設定してから、Spark 構成ファイルを変更することをお勧めします。ルート ディレクトリ C:\spark_1_4_1\config に移動して、log4j.properties.template ファイルのコピーを作成します。.template 拡張子を削除して、このコピーの名前を変更します。最初の構成エントリを、log4j.rootCategory=INFO から log4j.rootCategory=WARN に変更します。

Spark の既定の設定のままだと、すべての種類のメッセージが表示されるためです。ログ レベルを INFO から WARN に変更することで、メッセージの数が大幅に減少し、Spark の操作がスムーズになります。

Spark の Hello World

分散コンピューティングにおける Hello World は、データ ソース内のさまざまな単語を数える例です。図 7 に、Spark を使用した単語カウントの例を示します。

Spark を使用した単語のカウント
図 7 Spark を使用した単語のカウント

Scala シェルは、Read-Eval-Print Loop (REPL) シェルと呼ばれることもあります。Scala REPL シェルは、CTRL キーと L キーを同時に押すことでクリアできます。図 7 の最初のコマンドでは、以前と同じように、「f という RDD に README.md ファイルのコンテンツを読み込み」ます。実際のシナリオでは、データ ソースは、数百台ものコンピューターにまたがる巨大なファイルだったり、Cassandra などの分散型データベースだったりします。

次は、以下のコマンドです。

scala> val fm = f.flatMap(line => line.split(" "))

flatMap 関数呼び出しでは、RDD オブジェクト「f」内の各行を空白文字で分割し、結果を RDD オブジェクト「fm」に格納します。結果は、ファイル内のすべての単語のコレクションになります。開発者の観点では、「fm」は .NET List<string> コレクションのようなものと考えられます。

次は、以下のコマンドです。

scala> val m = fm.map(word => (word, 1))

map 関数は、単語と整数値 1 でそれぞれ構成した項目のペアを保持する RDD オブジェクトを作成します。m.take(5) コマンドを実行すると、そのペアの内容を確認できます。README.md ファイルにある最初の 5 個の単語が表示され、それぞれの単語の隣に値 1 が表示されます。開発者の観点では、「m」は Pair オブジェクトがそれぞれ文字列と数値で構成されている List<Pair> コレクションとほぼ同じです。文字列 (README.md の単語) がキーで、整数値が値です。ただし、Microsoft .NET Framework のキーと値のペアの多くとは異なり、Spark ではキーの重複が許可されます。キーと値のペアを保持する RDD オブジェクトは、普通の RDD と区別するために、ペア RDDと呼ばれることもあります。

次は、以下のコマンドです。

scala> val cts = m.reduceByKey((a,b) => a + b)

reduceByKey 関数は、同じキー値に関連付けられている整数値を加算することで、オブジェクト m の該当項目を結合します。cts.take(10) を実行すると、README.md ファイルに含まれる 10 個の単語と、その単語の出現回数が表示されます。cts オブジェクト内の単語は、特定の順序で並べられていなくてもかまいません。

reduceByKey 関数は、クロージャを受け取ります。以下のように、Scala の別のショートカット表記も使用できます。

scala> val cts = m.reduceByKey(_ + _)

アンダースコアは、ワイルド カード パラメーターです。したがって、この構文は「受け取った 2 つの値をすべて加算する」と解釈できます。

この単語カウントの例では、map 関数を使用後に、reduceByKey 関数を使用しています。これは、MapReduce パラダイムの一例です。

次は、以下のコマンドです。

scala> val sorted =
     cts.sortBy(item => item._2, false)

このコマンドは、項目の 2 つ目のパラメーター (単語数を表す整数値) に基づいて、cts RDD の項目を並べ替えます。false 引数は、降順 (つまり、大きい数値から小さい数値の順) で並べ替えることを意味します。並べ替えコマンドの Scala ショートカット構文は以下のとおりです。

scala> val sorted = cts.sortBy(_._2, false)

Scala は関数型言語の特徴を数多く持っており、キーワードの代わりに多くの記号を使用するため、直感的にはわかりにくい難解な Scala コードになってしまう可能性があります。

この Hello World 例の最後は、結果を表示するコマンドです。

scala> sorted.take(5).foreach(println)

上記のコマンドは、「sorted という RDD オブジェクトの先頭から 5 つのオブジェクトを取り出し、そのコレクションを反復処理して、その各項目に対して println 関数を実行する」という意味です。結果は以下のとおりです。

(,66)
(the,21)
(Spark,14)
(to,14)
(for,11)

README.md ファイルには空または null が 66 個、「the」が 21 個、「Spark」が 14 個という具合に単語が含まれています。

まとめ

Windows コンピューターで Spark を試す場合、今回示した情報を出発点として利用できます。Spark は (2009 年に カリフォルニア大学バークレー校で作成された) 比較的新しいテクノロジですが、少なくとも同僚の間では、ここ数か月の間に Spark に対する関心が飛躍的に高まっています。

2014 年に開催されたビッグ データ処理フレームワークに関する大会では、前年の Hadoop システムによるパフォーマンス記録を優に上回る、新しい記録を Spark が打ち立てています。並外れたパフォーマンス特性により、Spark は、特に機械学習システムに利用するのが最適です。Spark は、MLib というオープン ソース機械学習アルゴリズムのライブラリをサポートしています。


Dr. James McCaffrey は、ワシントン州レドモンドにある Microsoft Research に勤務しています。これまでに、Internet Explorer、Bing などの複数のマイクロソフト製品にも携わってきました。McCaffrey 博士の連絡先は、jammc@microsoft.com (英語のみ) です。

この記事のレビューに協力してくれたマイクロソフト技術スタッフの Gaz Iqbal および Umesh Madan に心より感謝いたします。