2018 年 4 月

第 33 卷,第 4 期

此文章由机器翻译

人工智能 - Apache Spark ML 简介

通过Frank La La |年 4 月 2018

Frank La VigneApache Spark ML 是机器学习库在 Apache Spark 上运行的模块。Spark 本身是提供数据工程师和数据科研人员用于编程的商用计算机的整个群集数据并行和容错的接口的群集计算环境。Spark 还支持多种语言,如 Java、 Scala、 Python 和。它还本机提供的 Jupyter 笔记本服务。请参阅我年 2 月人为智能列 (msdn.com/magazine/mt829269) 的 Jupyter 笔记本,如果你不熟悉它们的基础知识。在本文中,我将探讨在 Azure 上的 HDInsight 群集上 Juptyer 笔记本中使用 Spark ML。

在 Azure 上的 Apache Spark 入门

若要使用 Spark ML,第一步是创建 Spark 群集。登录到 Azure 门户并选择"创建资源,",然后选择 HDInsight,如下所示图 1。边栏选项卡显示逐步完成创建 HDInsight 群集的过程。 

创建新的 HDInsight 资源
图 1 创建新的 HDInsight 资源

第一个边栏选项卡,标记为基础知识,涵盖的群集,群集名称,如基本属性管理员和 ssh 凭据,以及资源组和位置。群集名称中 azurehdinsight.net 域必须是唯一的。有关引用,请参阅图 2。特定是重要性的群集类型的选项,随后会显示另一个边栏选项卡。在此边栏选项卡中, 所示图 3,将群集类型设置为 Apache Spark 和到 Spark 2.10 (HDI 3.6) 的版本。单击选择以保存设置并关闭边栏选项卡。

快速的基础知识步骤用于设置 HDInsight 群集创建过程
图 2 的基础知识步骤中快速创建进程有关设置 HDInsight 群集

为 HDInsight 群集选择作为群集类型的 Spark
为 HDInsight 群集选择作为群集类型的 Spark 的图 3

下一步配置群集的存储选项。将主存储类型和选择方法保留为其默认值。存储容器,单击新创建和将其命名为"msdnsparkstorage"并在默认容器文本框中设置默认容器"sparkstorage"。单击下一步可用于访问安装过程的摘要步骤。此屏幕提供查看和机会修改以及估计每小时的成本,以运行群集的群集安装程序。需要特别注意,必须始终删除在不使用时的群集。与在 Azure 中的虚拟机,不同的 HDInsight 群集没有一个选项以暂停和停止计费。单击创建并记下可能需要最多 20 分钟来实例化群集的通知。

HDFS 和 Azure Blob 存储

Spark,Hadoop,如使用 Hadoop 分布式文件系统 (HDFS) 作为其群集范围内的文件存储区。HDFS 旨在可靠地存储大型数据集,并使这些数据集在群集上运行的应用程序可以快速访问。顾名思义,HDFS 来自内部 Hadoop 和 Spark 也支持。

当运行 HDInsight 群集时,Azure 存储 Blob 将无缝地映射到 HDFS。这一事实可以很容易地上载和下载数据,与附加到 Spark 群集的 HDFS 文件存储。事实上,获取启动项目的第一步将使用 Azure 存储资源管理器上载要使用的数据文件。Azure 存储资源管理器是用于管理在 Azure 中的数据存储区的免费的跨平台的实用工具。如果你还没有安装,请这样等待要初始化的 Spark 群集时 (azure.microsoft.com/features/storage-explorer)。

设置群集和 Azure 存储资源管理器配置为访问为群集中创建的存储帐户后图 2,打开 Azure 存储资源管理器并浏览到使用树控件 (左侧的"msdndemo"blob图 4)。单击"msdndemo"以显示内容根目录下的 blob,然后单击新文件夹,并在创建新的虚拟目录对话框中,输入新的文件夹的名称: 航班。单击确定以创建该文件夹。接下来,单击上载按钮,选择上载文件,单击省略号按钮,然后浏览到本文中,"06 2015.csv"CSV 数据文件 单击上载可将文件上载到 Blob 存储区。

在为 Spark 群集创建的 Blob 容器中创建一个新文件夹
图 4 在为 Spark 群集创建的 Blob 容器中创建一个新文件夹

现在,上载数据文件时,就可以开始使用 PySpark 笔记本中的文件。Spark Python API commonlyreferred 到作为 PySpark,公开给 Python 的 Spark 编程模型。对于习惯于 Python 的开发人员,PySpark 会感到非常熟悉。Spark 网站提供的很好的介绍性说明对环境和与标准 Python (bit.ly/2oVBuCy)。

Jupyter 笔记本在 Spark

Apache Spark HDInsight 实现包括已在群集上运行的 Jupyter 笔记本的实例。访问环境的最简单方法是浏览到在 Azure 门户上的 Spark 群集边栏选项卡。在概述选项卡上,单击任一项标记为群集仪表板 (图 5)。在显示的边栏选项卡,单击 Jupyter 笔记本磁贴。如果凭据面临的挑战,使用前面创建的群集登录凭据。

启动 Jupyter 笔记本环境为 Spark 群集
图 5 为 Spark 群集启动 Jupyter 笔记本环境

一旦加载群集的 Jupyter 笔记本服务的主页,单击新建,然后选择 PySpark3 以创建新的 PySpark3 笔记本,,如下图中所示图 6

创建新的 PySpark 3 Jupyter 笔记本
图 6 创建新的 PySpark 3 Jupyter 笔记本

执行此操作将创建新的空白笔记本与空单元格。在此第一个单元格,输入以下代码以加载更早版本上载到 blob 的 CSV 文件。然后再按控件 + 键盘上的 Enter 执行代码,首先检查代码:

flights_df = spark.read.csv('wasb:///flights/06-2015.csv',
  inferSchema=True, header=True)

请注意在 URL 中的"wasb"协议处理程序。WASB 代表 Windows Azure 存储 Blob,并提供 Hadoop 和 Azure Blob 存储之间的接口。有关如何执行此操作和为什么这很重要,有关详细信息,请参阅博客文章,"为什么 WASB 使 Hadoop 上 Azure 因此太酷"(bit.ly/2oUXptz),和"了解 WASB 和 Hadoop 在 Azure Storage"(bit.ly/2ti43zu),同时由 Microsoft 开发人员 Cindy 毛编写。现在,但是,关键的一点是 Azure Blob 可以充当永久文件将存储的数据,即使未在运行 Spark 群集。此外,此处存储的数据为可访问的应用程序支持 Azure Blob 存储或 HDFS。

仍在此文本框上焦点,按控件 + Enter。立即,该单元格下的空间将读取"正在启动 Spark 应用程序。" 片刻之后,会显示有关只需运行的作业的和通知的某些数据的一个表,"SparkSession 是可用作 spark。"参数传递到 spark.read.csv 方法自动推断架构,指定的文件有标题行。CSV 文件的内容已加载到数据帧对象。若要查看架构,在新创建的空白单元格中输入下面的代码,然后通过按控件 + Enter 执行代码:

flights_df.printSchema()

架构出现并显示的名称和每个字段的数据类型。CSV 文件的标题行中字段的名称匹配。

在详细信息中的数据框架

在 Spark 中,数据框架是分布式的包含命名列的行集合。事实上,数据框架提供接口类似于关系数据库中的表或 Excel 工作表表具有命名的列标题。事实上,数据框架提供一个 API 到 Spark 的较低级别的基本数据结构: 弹性分布式数据集 (RDD)。RDDs 是容错的、 不可变、 分布式可以得到并行中的 Spark 群集的辅助节点上的对象的集合。

RDDs 本身已划分为较小的部分调用分区。Spark 自动确定要拆分 RDD 到其中的分区数。分区分布在群集上的节点。当 RDD 上执行操作,则每个其分区将启动任务和并行执行操作。恰当地说,对于那些 Spark 的新手,大多数体系结构的抽象出来: 第一个由 RDD 数据结构和 Spark 中,然后通过数据帧 API 的更高级别的抽象。

浏览与数据框架的数据

数据框架公开多个方式浏览数据。例如,若要查看数据的形状,下列代码输入到空白单元格,然后执行代码:

recordCount = flights_df.count()
column_count = len(flights_df.columns)
print ("DataFrame has " + str(recordCount) + "rows and " + str(
  column_count) + " columns.")

将读取最终产生的响应:

DataFrame has 503897 rows and 26 columns.

若要查看离开 BWI 机场航班,一个空单元格中输入以下并执行:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI')
flights_leaving_BWI.show()

在 flights_leaving_BWI 中创建新的数据帧,与从 flights_df 数据帧中筛选的结果代码。Show 方法显示的前 20 个行。结果看起来可能有点杂乱无章由于 Web 页面,数据帧包含 26 列的格式设置约束的特性。右现在,我只是想要查看航班运营商、 航班号、 月份和航班目标的一天。若要执行此操作,修改以下的单元格中的代码,并再次执行:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST")
flights_leaving_BWI.show()

格式将效果更好,因为有只有五列,返回并将更好地符合拖到网页。如果你想要通过航班目标对结果进行排序和显示的前 100 行而不是第一个 20?中的单元格的代码更改为以下做到这一点:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST").sort("DEST")
flights_leaving_BWI.show(100)

如果你想要创建的所有传出 DC 区域三个主要机场航班的数据帧?若要做到这一点,下列代码输入到新的单元格,并执行:

flights_leaving_DCmetro = flights_df.where( (flights_df.ORIGIN == 'BWI') |
  (flights_df.ORIGIN == 'DCA') | (flights_df.ORIGIN == 'IAD') ).select(
  "UNIQUE_CARRIER", "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "ORIGIN", "DEST")
flights_leaving_DCmetro.show()

请注意,我添加源字段到此数据帧,以显示从,其中发起航班。

创建新的单元格,输入下面的代码并执行它:

print ("Flights leaving BWI: " + str(flights_leaving_BWI.count()))
print ("Flights leaving DC Metro: " + str(flights_leaving_DCmetro.count()))

返回的结果应如下所示:

Flights leaving BWI: 8422
Flights leaving DC Metro: 18502

如你所见,使用数据框架得直观而简单。并行执行数据查询的执行完成的繁重任务可通过 Spark 核心引擎。此外,任何人熟悉 Hive 或 Pig 可发现这些查询的执行时间为 Spark 中的速度要快很。

使用 SparkML 创建预测模型

若要使用 Spark ML 创建预测模型,我需要将某些库导入到我的项目。若要执行此操作,创建新的单元格,输入下面的代码,并执行它:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

接下来,我需要我需要创建预测模型的航班延迟的可能性字段创建新的数据帧。下面的代码 pares 只六个数据帧中的原始 26 列下。当原始数据集具有时,该值指示是否某个航班已延迟 15 分钟或更多的列时,我想要添加一些更高的粒度。我将创建一个新的列设置为 1,如果某个航班延迟 10 分钟或更多并命名该列"IsDelayed。" 此外,我调用 dropna 函数,这会导致断开具有 null 值的任何行。ML 算法可以繁琐有关它们接收的数据输入。非常频繁意外的 null 值将引发异常,或糟糕的是,损坏的结果。下列代码输入到新的单元格,并执行它。结果将显示 model_data 的前 20 个行:

model_data = flights_df.select("DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID",
  "DEST_AIRPORT_ID", "DEP_DELAY", ((col("ARR_DELAY") >
  10).cast("Int").alias("IsDelayed"))).dropna()
  model_data.show()

监管的学习

在受监督 ML 识别地面点真实在内。在这种情况下,它为在美国国内的航班准时到达记录 2015 年 6 月。基于该数据,该算法将生成预测模型上是否将被航班的到达延迟 10 分钟或更多给定五个字段:月、 日的周、 源机场 ID、 目标机场 ID 和不同,延迟的某一天。在 ML 术语中,这些字段被统称为"功能。" 中的到达延迟为 10 分钟或小于 10 分钟,一个指示器的这种情况下预测的值称为"label"。

监管的 ML 模型通常与它们在定型在其的相同数据集测试。若要执行此操作,它们随机拆分为两个数据集: 一个用于定型,一个用于测试。通常情况下,它们是沿 60/40 或 70/30 行拆分与更大的共享进入定型数据。

下面的代码将定型数据分割开来,为两个测试数据沿 70/30 行设置并显示计数 (同样,将代码输入到新的单元格并执行它):

split_model_data = model_data.randomSplit([0.7,0.3])
training_data = split_model_data[0]
testing_data = split_model_data[1]
all_data_count = training_data.count() + testing_data.count()
print ("Rows in Training Data: " + str(training_data.count()))
print ("Rows in Testing Data: " + str(testing_data.count()))
print ("Total Rows: " + str(all_data_count))

总的行笔记本中显示此处及更早版本中将有差异。这是由于行具有丢弃的 null 值。

测试数据现在必须进行进一步的修改以满足 ML 算法的要求。表示的功能的五个字段都将合并到一个数组或一个向量,通过调用向量化的过程。IsDelayed 列将被重命名为标签。定型数据帧将具有仅使用两个列: 功能和标签。一个空单元格中输入下面的代码并执行它,并将显示数据帧的培训的前 20 个行:

vector_assembler = VectorAssembler(
  inputCols = ["DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
  "DEP_DELAY"], outputCol="features")
training = vector_assembler.transform(training_data).select(col("features"),
  col("IsDelayed").cast("Int").alias("label"))
  training.show(truncate=False)

与定型数据拆分为两个列,功能和标签,已准备好提供给 ML 算法。在这种情况下,即可选择逻辑回归。逻辑回归是用于分析数据的统计方法,其中一个或多个输入的变量影响结果。为此模型中,输入的变量是 DAY_OF_MONTH、 DAY_OF_WEEK、 ORIGIN_AIRPORT_ID、 DEST_AIRPORT_ID 和 DEP_DELAY 的字段的特征列的内容。结果为标签列,或如果航班延迟时间超过 10 分钟。此算法不会区分 15 小时的延迟并为 10 分钟 1 秒的延迟。创建模型的定型数据拟合到它。同样,下列代码输入到空白单元格,并执行它:

logR = LogisticRegression(labelCol="label", featuresCol="features",
  maxIter=10, regParam=0.1)
model = logR.fit(training)

使用定型模型,唯一要左执行是对其进行测试。此外必须修改测试数据以适合该算法的期望通过向量汇编程序运行它,因为定型数据。下列代码输入到空白单元格,并执行它:

testing = vector_assembler.transform(testing_data).select(col("features"),
  (col("IsDelayed")).cast("Int").alias("trueLabel"))
  testing.show()

现在,准备好定型数据下, 一步是通过调用转换方法通过模型运行它。输出是具有四个列的数据帧: 在其预测功能、 预测的值、 的实际值和概率中,已有多大把握算法的度量值。一次更一个空单元格中输入下面的代码并执行它:

prediction = model.transform(testing)
predicted = prediction.select("features", "prediction", "trueLabel", "probability")
predicted.show()

输出只显示前 20 个行。不测量的效用模型的一种高效方式。最佳方式执行操作要计数的算法预测以及正确多次它的次数不正确。但是,一个简单的"正确和错误"度量值并不总是判断出完整的情景。更好的度量值是内容称为"混淆矩阵",其中显示了数量以及数量的误报和漏报真阳性 true 假负。下列代码输入到空白单元格,并执行它,以显示此模型的混淆矩阵:

true_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '0')).count()
true_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '1')).count()
false_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '1')).count()
false_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '0')).count()
print ( "True Positive: " + str(true_positives) )
print ( "True Negative: " + str(true_negatives) )
print ( "False Positive: " + str(false_positives) )
print ( "False Negative: " + str(false_negatives) )

结果将是不鼓励。模型已显著错误个以上的是对。所有不会丢失,但是。当试验失败时,最好是做法的对结果进行分析,请调整模型参数并重试。这正是该字段称为"数据科学。"

总结

Spark 是一个快速且功能强大的群集计算环境中进行并行处理的数据的工作负荷具有模块化体系结构。PySpark 和 Spark ML,本文中介绍的两个模块均。PySpark 提供运行的 Python 运行时的 Spark 和弹性分发中的数据集 (RDDs) 的数据框架 API 窗体的高级抽象。Spark ML 库提供的机器学习 API 基于数据框架的数据。

机器学习是与数据科学的更大的字段的准则。当试验未产生所需的结果时,查找解决方案需要一种迭代方法。可能是 10 分钟是太粗略的时间间隔。可能五个以上的输入的字段会帮助发现模式。很可能是一个月累计的航班数据是不够的要建立的清晰模式的算法。若要知道的唯一方法是要保留试验,分析结果,并调整输入的数据和参数。


Frank La Vigne 负责领导 Wintellect 的数据与分析实践,共同主持 DataDriven 播客。他定期在 FranksWorld.com 上发表博客,你还可以在他的 YouTube 频道“Frank’s World TV”(FranksWorld.TV) 中看到他。

衷心感谢以下技术专家对本文的审阅:Andy Leonard


在 MSDN 杂志论坛讨论这篇文章