April 2018

Volume 33 Number 4

[Artificially Intelligent]

Introducing Apache Spark ML

By Frank La | April 2018

Frank La VigneApache Spark ML is a machine learning library module that runs on top of Apache Spark. Spark itself is a cluster computing environment that provides data engineers and data scientists an interface for programming entire clusters of commodity computers with data parallelism and fault tolerance. Spark supports a number of languages such as Java, Scala, Python and R. It also natively provides a Jupyter Notebook service. Please refer to my February Artificially Intelligent column (msdn.com/magazine/mt829269) for the fundamentals of Jupyter Notebooks if you’re not familiar with them. In this article, I’ll explore using Spark ML in a Juptyer Notebook on an HDInsight cluster on Azure.

Getting Started with Apache Spark on Azure

To work with Spark ML, the first step is to create a Spark cluster. Log into the Azure Portal and choose “Create a resource,” then choose HDInsight, as shown in Figure 1. The blades that appear walk through the process of creating an HDInsight cluster. 

Creating a New HDInsight Resource
Figure 1 Creating a New HDInsight Resource

The first blade, labeled Basics, covers essential properties of the cluster, such as Cluster Name, administrator and ssh credentials, as well as resource group and location. The cluster name must be unique across the azurehdinsight.net domain. For reference, please refer to Figure 2. Of particular importance is the Cluster Type option, which brings up yet another blade. In this blade, shown in Figure 3, set the cluster type to Apache Spark and the version to Spark 2.10 (HDI 3.6). Click Select to save the setting and close the blade.

The Basics Step of the Quick Create Process for Setting Up an HDInsight Cluster
Figure 2 The Basics Step of the Quick Create Process for Setting Up an HDInsight Cluster

Choosing Spark as the Cluster Type for the HDInsight Cluster
Figure 3 Choosing Spark as the Cluster Type for the HDInsight Cluster

The next step configures storage options for the cluster. Leave Primary Storage Type and Selection Method at their defaults. For storage container, click create new and name it “msdnsparkstorage” and set the default container to “sparkstorage” in the Default Container textbox. Click Next to get to the Summary step of the setup process. This screen offers a review and opportunity to modify the cluster setup along with the estimated hourly cost to run the cluster. Take special care to always delete clusters when not in use. Unlike virtual machines in Azure, HDInsight clusters do not have an option to pause and stop billing. Click Create and take note of the notification that it can take up to 20 minutes to instantiate the cluster.

HDFS and Azure Blob Storage

Spark, like Hadoop, uses the Hadoop Distributed File System (HDFS) as its cluster-wide file store. HDFS is designed to reliably store large datasets and make those datasets rapidly accessible to applications running on the cluster. As the name implies, HDFS originated within Hadoop and is also supported by Spark.

When running HDInsight clusters, Azure Storage Blobs seamlessly map to HDFS. This fact makes it simple to upload and download data to and from the HDFS file store attached to the Spark cluster. In fact, the first step to getting the project started will be using Azure Storage Explorer to upload the data file with which to work. Azure Storage Explorer is a free, cross-platform utility to manage data stores in Azure. If you don’t already have it installed, please do so while waiting for the Spark cluster to initialize (azure.microsoft.com/features/storage-explorer).

Once the cluster is set up and Azure Storage Explorer is configured to access the storage account created for the cluster in Figure 2, open Azure Storage Explorer and browse to the “msdndemo” blob using the tree control on the left (Figure 4). Click on “msdndemo” to reveal the contents at the root of the blob, then click New Folder and in the Create New Virtual Directory dialog, enter the name for the new folder: flights. Click OK to create the folder. Next, click on the Upload button, choose Upload Files, click the ellipsis button, and browse to the CSV data file for this article, “06-2015.csv.” Click Upload to upload the file to the Blob store.

Creating a New Folder in the Blob Container Created for the Spark Cluster
Figure 4 Creating a New Folder in the Blob Container Created for the Spark Cluster

Now that the data file is uploaded, it’s time to start working with the file in a PySpark Notebook. The Spark Python API, commonlyreferred to as PySpark, exposes the Spark programming model to Python. For developers accustomed to Python, PySpark will feel very familiar. The Spark Web site provides a great introductory explanation to the environment and how it differs from standard Python (bit.ly/2oVBuCy).

Jupyter Notebooks in Spark

The HDInsight implementation of Apache Spark includes an instance of Jupyter Notebooks already running on the cluster. The easiest way to access the environment is to browse to the Spark cluster blade on the Azure Portal. On the overview tab, click either of the items labeled Cluster Dashboard (Figure 5). In the blade that appears, click on the Jupyter Notebook tile. If challenged for credentials, use the cluster login credentials created earlier.

Launching the Jupyter Notebooks Environment for the Spark Cluster
Figure 5 Launching the Jupyter Notebooks Environment for the Spark Cluster

Once the homepage for the cluster’s Jupyter Notebooks service loads, click New and then choose PySpark3 to create a new PySpark3 notebook, as depicted in Figure 6.

Creating a New PySpark 3 Jupyter Notebook
Figure 6 Creating a New PySpark 3 Jupyter Notebook

Doing this will create a new blank notebook with an empty cell. In this first cell, enter the following code to load the CSV file uploaded to the blob earlier. Before pressing Control+Enter on the keyboard to execute the code, examine the code first:

flights_df = spark.read.csv('wasb:///flights/06-2015.csv',
  inferSchema=True, header=True)

Note the “wasb” protocol handler in the URL. WASB stands for Windows Azure Storage Blobs and provides an interface between Hadoop and Azure Blob storage. For more information about how this was done and why this is significant, refer to the blog posts, “Why WASB Makes Hadoop on Azure So Very Cool” (bit.ly/2oUXptz), and “Understanding WASB and Hadoop Storage in Azure” (bit.ly/2ti43zu), both written by Microsoft developer Cindy Gross. For now, however, the key takeaway is that Azure Blobs can act as persistent file stores for data even when the Spark cluster isn’t running. Furthermore, the data stored here is accessible by applications that support either Azure Blob storage or HDFS.

With the focus still on this textbox, press Control+Enter. Immediately, the space beneath the cell will read “Starting Spark application.” After a moment, a table appears with some data about the job that just ran and a notification that the “SparkSession is available as ‘spark.’” The parameters passed along to the spark.read.csv method automatically inferred a schema and indicated that the file has a header row. The contents of the CSV file were loaded into a DataFrame object. To view the schema, enter the following code into the newly created blank cell and then execute the code by pressing Control+Enter:

flights_df.printSchema()

The schema appears and displays the name and datatype of each field. The names match the names of the fields from the header row of the CSV file.

DataFrames in Detail

In Spark, DataFrames are a distributed collection of rows with named columns. Practically speaking, DataFrames provide an interface similar to tables in relational databases or an Excel worksheet sheet with named column headers. Under the covers, DataFrames provide an API to a lower level fundamental data structure in Spark: a resilient distributed dataset (RDD). RDDs are a fault-tolerant, immutable, distributed collection of objects that can be worked on in parallel across the worker nodes in a Spark cluster.

RDDs themselves are divided into smaller pieces called Partitions. Spark automatically determines the number of partitions into which to split an RDD. Partitions are distributed across the nodes on the cluster. When an action is performed on an RDD, each of its partitions launches a task and the action is executed in parallel. Happily, for those new to Spark, most of the architecture is abstracted away: first by the RDD data structure and Spark, then by the higher-level abstraction of the DataFrame API.

Exploring Data with DataFrames

DataFrames expose multiple ways to explore data. For instance, to view the shape of the data, enter the following code into a blank cell and then execute the code:

recordCount = flights_df.count()
column_count = len(flights_df.columns)
print ("DataFrame has " + str(recordCount) + "rows and " + str(
  column_count) + " columns.")

The resulting response will read:

DataFrame has 503897 rows and 26 columns.

To view only the flights leaving BWI airport, enter the following into an empty cell and execute:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI')
flights_leaving_BWI.show()

The code created a new DataFrame in the flights_leaving_BWI with results filtered from the flights_df DataFrame. The show method displays the top 20 rows. The results may look a bit disorganized due to the nature of the formatting constraints of the Web page and the DataFrame containing 26 columns. Right now, I just want to see the airline carrier, flight number, day of month and flight destination. To do this, modify the code in the cell to the following and execute again:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST")
flights_leaving_BWI.show()

The formatting will look better as there are only five columns returned and will fit better onto the Web page. What if you wanted to sort the results by flight destination and show the first 100 rows as opposed to the first 20? Change the code in the cell to the following to do just that:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST").sort("DEST")
flights_leaving_BWI.show(100)

What if you wanted to create a DataFrame of all the flights departing the DC area’s three major airports? To do that, enter the following code into a new cell and execute:

flights_leaving_DCmetro = flights_df.where( (flights_df.ORIGIN == 'BWI') |
  (flights_df.ORIGIN == 'DCA') | (flights_df.ORIGIN == 'IAD') ).select(
  "UNIQUE_CARRIER", "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "ORIGIN", "DEST")
flights_leaving_DCmetro.show()

Note that I added the origin field into this DataFrame to show from where a flight originated.

Create a new cell, enter the following code and execute it:

print ("Flights leaving BWI: " + str(flights_leaving_BWI.count()))
print ("Flights leaving DC Metro: " + str(flights_leaving_DCmetro.count()))

The results returned should look something like this:

Flights leaving BWI: 8422
Flights leaving DC Metro: 18502

As you can see, working with DataFrames is intuitive and simple. The heavy lifting to parallelize the execution of the data queries is done by the Spark core engine. Additionally, anyone familiar with Hive or Pig will notice that execution time on these queries is significantly faster in Spark.

Creating a Predictive Model with SparkML

To create a predictive model with Spark ML, I need to import some libraries into my project. To do this, create a new cell, enter the following code, and execute it:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

Next, I need to create a new DataFrame with just the fields I need to create a predictive model of the likelihood of a flight being delayed. The following code pares down the original 26 columns in the DataFrame to just six. While the original dataset has a column indicating if a flight was delayed 15 minutes or more, I would like to add some more granularity. I’ll create a new column set to 1 if a flight is delayed 10 minutes or more and name that column “IsDelayed.” Additionally, I call the dropna function, which drops any rows with null values. ML algorithms can be finicky about the data inputs they receive. Very often an unexpected null value will throw an exception, or worse, corrupt the results. Enter the following code into a new cell and execute it. The results will show the first 20 rows of model_data:

model_data = flights_df.select("DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID",
  "DEST_AIRPORT_ID", "DEP_DELAY", ((col("ARR_DELAY") >
  10).cast("Int").alias("IsDelayed"))).dropna()
  model_data.show()

Supervised Learning

In supervised ML, the ground truth is known. In this case, it’s the on-time arrival records of flights in the United States for June 2015. Based on that data, the algorithm will generate a predictive model on whether a flight’s arrival will be delayed by 10 minutes or more given the five fields: Day of Month, Day of Week, Origin Airport ID, Destination Airport ID and Departure Delay. In ML terms, those fields are collectively referred to as “features.” The predicted value, in this case an indicator of an arrival delay of 10 minutes or less than 10 minutes, is referred to as the “label.”

Supervised ML models are often tested with the same dataset on which they’re trained. To do this, they’re randomly split into two datasets: one for training and one for testing. Usually, they’re split along a 60/40 or 70/30 line with the larger share going into the training data.

The following code separates the training data and the test data into two sets along a 70/30 line and displays the count (once again, enter the code into a new cell and execute it):

split_model_data = model_data.randomSplit([0.7,0.3])
training_data = split_model_data[0]
testing_data = split_model_data[1]
all_data_count = training_data.count() + testing_data.count()
print ("Rows in Training Data: " + str(training_data.count()))
print ("Rows in Testing Data: " + str(testing_data.count()))
print ("Total Rows: " + str(all_data_count))

There will be a discrepancy in the total rows displayed here and earlier in the notebook. That’s due to rows with null values being dropped.

The test data now must be further modified to meet the requirements of the ML algorithm. The five fields representing the features will be combined into an array, or a vector, through a process called vectorization. The IsDelayed column will be renamed to label. The training DataFrame will have just two columns: features and label. Enter the following code into an empty cell and execute it and the first 20 rows of the training DataFrame will be displayed:

vector_assembler = VectorAssembler(
  inputCols = ["DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
  "DEP_DELAY"], outputCol="features")
training = vector_assembler.transform(training_data).select(col("features"),
  col("IsDelayed").cast("Int").alias("label"))
  training.show(truncate=False)

With the training data split into two columns, features and labels, it’s ready to be fed to the ML algorithm. In this case, I’ve chosen logistic regression. Logistic regression is a statistical method for analyzing data where one or more input variables influence an outcome. For this model, the input variables are the contents of the feature column, the fields DAY_OF_MONTH, DAY_OF_WEEK, ORIGIN_AIRPORT_ID, DEST_AIRPORT_ID and DEP_DELAY. The outcome is the label column, or if the flight was delayed more than 10 minutes. This algorithm will not distinguish between a delay of 10 minutes and one second and a 15-hour delay. The model is created by fitting the training data to it. Again, enter the following code into a blank cell and execute it:

logR = LogisticRegression(labelCol="label", featuresCol="features",
  maxIter=10, regParam=0.1)
model = logR.fit(training)

With the model trained, the only thing left to do is test it. The testing data must also be modified to fit the expectations of the algorithm by running it through the vector assembler as the training data was. Enter the following code into a blank cell and execute it:

testing = vector_assembler.transform(testing_data).select(col("features"),
  (col("IsDelayed")).cast("Int").alias("trueLabel"))
  testing.show()

Now that the training data is prepared, the next step is to run it through the model by calling the transform method. The output is a DataFrame with four columns: the features, the predicted value, the actual value and the probability, a measure of how confident the algorithm was in its prediction. Once more, enter the following code into an empty cell and execute it:

prediction = model.transform(testing)
predicted = prediction.select("features", "prediction", "trueLabel", "probability")
predicted.show()

The output only shows the first 20 rows. That’s not an efficient way of measuring the efficacy of the model. The best way to do that is to count the number of times the algorithm predicted correctly and how many times it was wrong. However, a simple “right and wrong” metric doesn’t always tell the full story. The better metric is something called the “confusion matrix,” which displays the number of true negatives and true positives along with the number of false positives and false negatives. Enter the following code into a blank cell and execute it to display the confusion matrix for this model:

true_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '0')).count()
true_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '1')).count()
false_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '1')).count()
false_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '0')).count()
print ( "True Positive: " + str(true_positives) )
print ( "True Negative: " + str(true_negatives) )
print ( "False Positive: " + str(false_positives) )
print ( "False Negative: " + str(false_negatives) )

The results are not encouraging. The model was wrong considerably more than it was right. All is not lost, however. When experiments fail, the best course of action is to analyze the results, tweak the model parameters and try again. This is why the field is called “data science.”

Wrapping Up

Spark is a fast and powerful cluster computing environment for parallel processing of data workloads with a modular architecture. Two modules explored in this article were PySpark and Spark ML. PySpark provides a run Python runtime for Spark and high-level abstraction of Resilient Distributed Datasets (RDDs) in the form of a DataFrames API. The Spark ML library provides a machine learning API for data built on top of DataFrames.

Machine learning is a discipline with the larger field of data science. When an experiment doesn’t yield the desired results, finding the solution requires an iterative approach. Perhaps 10 minutes is too granular an interval. Maybe more than five input fields would help uncover a pattern. Quite possibly one month’s worth of flight data is not enough for the algorithm to establish a clear pattern. The only way to know is to keep experimenting, analyzing the results, and adjusting the input data and parameters.


Frank La Vigne leads the Data & Analytics practice at Wintellect and co-hosts the DataDriven podcast. He blogs regularly at FranksWorld.com and you can watch him on his YouTube channel, “Frank’s World TV” (FranksWorld.TV).

Thanks to the following technical expert for reviewing this article: Andy Leonard


Discuss this article in the MSDN Magazine forum