Aprile 2018

Volume 33 Numero 4

Il presente articolo è stato tradotto automaticamente.

Artificially Intelligent - Introduzione di Apache Spark Machine Learning

Dal Frank La La | 2018 aprile

Frank La VigneApache Spark ML è un modulo di libreria che viene eseguito nel primo Apache Spark di apprendimento. Spark stessa è un ambiente cluster che fornisce un'interfaccia di dati tecnici e gli esperti di dati per la programmazione intero cluster di computer commodity con parallelismo dei dati e tolleranza di errore. Spark supporta numerosi linguaggi, ad esempio Java, Scala, Python e R. Anche in modo nativo fornisce un servizio server Jupyter Notebook. Consultare la colonna febbraio artificialmente intelligente (msdn.com/magazine/mt829269) per le nozioni di base di notebook Jupyter se non si ha familiarità con essi. In questo articolo, esamineranno con Spark ML in un blocco per Appunti Juptyer in un cluster HDInsight in Azure.

Introduzione ad Apache Spark in Azure

Per usare Spark ML, il primo passaggio consiste nel creare un cluster Spark. Accedere al portale di Azure e scegliere "Creazione di una risorsa", quindi scegliere HDInsight, come illustrato figura 1. Pannelli di visualizzati esaminare in dettaglio il processo di creazione di un cluster HDInsight. 

Creazione di una nuova risorsa di HDInsight
Figura 1 creando una nuova risorsa di HDInsight

Nel primo pannello etichettato nozioni di base, vengono illustrate le proprietà fondamentali del cluster, ad esempio nome del Cluster, amministratore e ssh credenziali, nonché il gruppo di risorse e località. Il nome del cluster deve essere univoco all'interno del dominio azurehdinsight. Per riferimento, consultare figura 2. Di particolare importanza è l'opzione di tipo Cluster, che consente di visualizzare un altro pannello. In questo pannello visualizzato nella figura 3, impostare il tipo di cluster Apache Spark e la versione 2.10 Spark (HDI 3.6). Fare clic su Seleziona per salvare l'impostazione e chiudere il pannello.

Il passaggio di nozioni di base di rapido creare il processo per la configurazione di un Cluster HDInsight
Figura 2 il passaggio di nozioni di base di rapido creare il processo per la configurazione di un Cluster HDInsight

Scelta di Spark come tipo di Cluster per il Cluster HDInsight
Figura 3 scelta Spark come tipo di Cluster per il Cluster HDInsight

Il passaggio successivo consente di configurare le opzioni di archiviazione per il cluster. Lasciare il tipo di archiviazione primaria e metodo di selezione impostate sui valori predefiniti. Contenitore di archiviazione, fare clic su Crea nuovo e denominarla "msdnsparkstorage" e impostare il contenitore predefinito per "sparkstorage" nella casella di testo contenitore predefinito. Fare clic su Avanti per proseguire con il passaggio di riepilogo del processo di installazione. Questa schermata offre una revisione e la possibilità di modificare l'installazione del cluster con il costo orario stimato per l'esecuzione del cluster. Prestare particolare attenzione per eliminare sempre i cluster quando non è in uso. A differenza delle macchine virtuali in Azure, i cluster HDInsight sono disponibile un'opzione per sospendere e arrestare di fatturazione. Fare clic su Crea e prendere nota della notifica che può richiedere fino a 20 minuti per creare un'istanza del cluster.

Archiviazione Blob di Azure e HDFS

Spark, ad esempio Hadoop, utilizza Hadoop Distributed File System (HDFS) come archivio di file a livello di cluster. HDFS è progettato per archiviare grandi set di dati in modo affidabile e rendere questi set di dati rapidamente accessibili alle applicazioni in esecuzione nel cluster. Come suggerisce il nome, HDFS ha avuto origine all'interno di Hadoop e sono supportati anche da Spark.

Quando si esegue il cluster HDInsight, BLOB di archiviazione Azure eseguire facilmente il mapping a HDFS. Ciò rende più semplice da caricare e scaricare i dati da e verso l'archivio di file HDFS associati al cluster Spark. In realtà, il primo passaggio per ottenere il progetto di avvio verrà utilizzato Esplora archivi Azure per caricare il file di dati con cui operare. Azure Storage Explorer è un'utilità gratuita e multipiattaforma per gestire gli archivi dati in Azure. Se non si dispone già installata, eseguire la distribuzione durante l'attesa per il cluster Spark inizializzare (azure.microsoft.com/features/storage-explorer).

Una volta configurato il cluster e Azure Storage Explorer è configurato per accedere all'account di archiviazione creati per il cluster in figura 2aprire Azure Storage Explorer e passare al blob "msdndemo" utilizzando il controllo struttura ad albero a sinistra ( Figura 4). Fare clic su "msdndemo" per visualizzare il contenuto nella radice del blob, quindi fare clic su nuova cartella e nella finestra di dialogo Crea nuova Directory virtuale, immettere il nome per la nuova cartella: voli. Fare clic su OK per creare la cartella. Successivamente, fare clic sul pulsante Carica scegliere Carica file, fare clic sul pulsante con puntini di sospensione e individuare il file di dati CSV per l'articolo, "06-2015.csv." Fare clic su Carica per caricare il file di archivio Blob.

Creazione di una nuova cartella nel contenitore Blob creato per il Cluster Spark
Figura 4 creando una nuova cartella nel contenitore Blob creato per il Cluster Spark

Ora che viene caricato il file di dati, è necessario iniziare a lavorare con il file in un blocco per Appunti PySpark. L'API di Python Spark, commonlyreferred a come PySpark, espone il modello di programmazione Spark per Python. Per gli sviluppatori abituati a Python, PySpark risulterà molto familiare. Il sito Web di Spark viene fornita una spiegazione introduttiva ideale per l'ambiente e delle differenze rispetto alla Python standard (bit.ly/2oVBuCy).

Notebook Jupyter Spark

L'implementazione di HDInsight di Apache Spark include un'istanza di notebook Jupyter già in esecuzione nel cluster. Il modo più semplice per accedere all'ambiente è per passare al pannello cluster Spark nel portale di Azure. Nella scheda Panoramica, fare clic su uno degli elementi con etichettati Cluster Dashboard (figura 5). Nel pannello visualizzato, fare clic sul riquadro Server Jupyter Notebook. Se richieste le credenziali, utilizzare le credenziali dell'account di accesso cluster creati in precedenza.

Avviare l'ambiente notebook Jupyter per il Cluster Spark
Figura 5 avviato l'ambiente di notebook Jupyter per il Cluster Spark

Una volta che la home page per il servizio del cluster Jupyter notebook caricato, fare clic su Nuovo e quindi scegliere PySpark3 per creare un nuovo blocco appunti PySpark3, come illustrato nella figura 6.

Creazione di un server Jupyter Notebook PySpark 3 nuovi
Figura 6 la creazione di un server Jupyter Notebook PySpark 3 nuovi

In questo modo verrà creato un nuovo blocco appunti vuoto con una cella vuota. In questa prima cella, immettere il codice seguente per caricare il file CSV caricato in precedenza per il blob. Prima di premere CTRL + INVIO sulla tastiera per eseguire il codice, esaminare innanzitutto il codice:

flights_df = spark.read.csv('wasb:///flights/06-2015.csv',
  inferSchema=True, header=True)

Si noti il gestore di protocollo "wasb" nell'URL. WASB è l'acronimo di BLOB di archiviazione Windows Azure e fornisce un'interfaccia tra archiviazione Hadoop e Blob di Azure. Per ulteriori informazioni su come questa operazione viene eseguita e motivi per cui questo aspetto è rilevante, vedere il post di blog, "Perché WASB rende Hadoop in Azure così molto interessante" (bit.ly/2oUXptz) e "Understanding WASB e Hadoop archiviazione in Azure" (bit.ly/2ti43zu), entrambi scritto dallo sviluppatore di Microsoft per Cindy Gross. Per il momento, tuttavia, le informazioni chiave sono che i BLOB di Azure può fungere da file persistenti vengono archiviati per i dati anche quando il cluster Spark non è in esecuzione. Inoltre, i dati memorizzati nella tabella sono accessibili per le applicazioni che supportano l'archiviazione Blob di Azure o HDFS.

Con lo stato attivo nella casella di testo, premere CTRL + INVIO. Immediatamente, lo spazio di sotto della cella leggerà "Applicazione di Spark avvio in corso". Dopo qualche istante, viene visualizzata una tabella con alcuni dati sul processo che è stata eseguita e una notifica che la "SparkSession è disponibile come 'spark'." I parametri passati lungo per il spark.read.csv metodo automaticamente dedurre uno schema e indicato che il file contiene una riga di intestazione. Il contenuto del file CSV sono stato caricato in un oggetto frame di dati. Per visualizzare lo schema, immettere il codice seguente nella cella vuota appena creata e quindi eseguire il codice premendo CTRL + INVIO:

flights_df.printSchema()

Lo schema viene visualizzato il nome e il tipo di dati di ogni campo. I nomi corrispondano ai nomi dei campi dalla riga di intestazione del file CSV.

Frame di dati in modo dettagliato

In Spark, frame di dati sono una raccolta di righe con colonne denominate distribuita. Praticamente generale, il frame di dati forniscono un'interfaccia simile alle tabelle nei database relazionali o un foglio di foglio di lavoro di Excel con le intestazioni di colonna denominata. Dietro le quinte, frame di dati forniscono un'API in una struttura di dati a fondamentale livello inferiore in Spark: un dataset distribuito resiliente (RDD). RDDs sono una raccolta di oggetti che possa essere elaborata in parallelo tra i nodi di lavoro in un cluster Spark a tolleranza di errore, non modificabile e distribuita.

RDDs stessi sono suddivisi in sezioni più piccole le partizioni di chiamata. Spark determina automaticamente il numero di partizioni in cui si desidera dividere un RDD. Le partizioni vengono distribuite tra i nodi nel cluster. Quando viene eseguita un'azione su un RDD, ognuna delle relative partizioni viene avviata un'attività e l'azione viene eseguita in parallelo. Fortunatamente, per quelle nuove per Spark, la maggior parte dell'architettura viene estratto: primo dalla struttura di data RDD e Spark, quindi dall'astrazione di alto livello dell'API di frame di dati.

Esplorazione dei dati con frame di dati

Frame di dati espongono diversi modi per esplorare i dati. Per visualizzare la forma dei dati, ad esempio, immettere il codice seguente in una cella vuota e quindi eseguire il codice:

recordCount = flights_df.count()
column_count = len(flights_df.columns)
print ("DataFrame has " + str(recordCount) + "rows and " + str(
  column_count) + " columns.")

La risposta risultante leggerà:

DataFrame has 503897 rows and 26 columns.

Per visualizzare solo i voli lasciando aeroporto BWI, immettere quanto segue in una cella vuota ed eseguire:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI')
flights_leaving_BWI.show()

Il codice creato un nuovo frame di dati nel flights_leaving_BWI con i risultati filtrati dalla flights_df frame di dati. Il metodo show Visualizza le prime 20 righe. I risultati potrebbero essere poco poco professionale a causa della natura dei vincoli formattazione della pagina Web e il frame di dati contenente 26 colonne. Destra a questo punto, desidera visualizzare solo il vettore airline, numero di volo, giorno del mese e volo destinazione. A tale scopo, modificare il codice nella cella al seguente ed eseguire di nuovo:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST")
flights_leaving_BWI.show()

La formattazione avrà un aspetto migliore sono presenti solo cinque colonne restituito e si adatterà meglio nella pagina Web. Cosa accade se si desidera ordinare i risultati dalla destinazione di volo e Mostra le prime 100 righe anziché i primi 20? Modificare il codice nella cella a quanto segue per hanno esattamente questo scopo:

flights_leaving_BWI = flights_df.where(flights_df.ORIGIN == 'BWI').select("UNIQUE_CARRIER",
  "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "DEST").sort("DEST")
flights_leaving_BWI.show(100)

Cosa accade se si desidera creare un frame di dati di tutti i voli in partenza tre principali quelle degli aeroporti dell'area di controller di dominio? A tale scopo, immettere il codice seguente in una nuova cella ed eseguire:

flights_leaving_DCmetro = flights_df.where( (flights_df.ORIGIN == 'BWI') |
  (flights_df.ORIGIN == 'DCA') | (flights_df.ORIGIN == 'IAD') ).select(
  "UNIQUE_CARRIER", "FL_NUM", "TAIL_NUM",  "DAY_OF_MONTH", "ORIGIN", "DEST")
flights_leaving_DCmetro.show()

Si noti che sono stati aggiunti il campo di origine in questo frame di dati da visualizzare da dove un volo ha avuto originato.

Creare una nuova cella, immettere il codice seguente ed eseguirlo:

print ("Flights leaving BWI: " + str(flights_leaving_BWI.count()))
print ("Flights leaving DC Metro: " + str(flights_leaving_DCmetro.count()))

I risultati restituiti dovrebbero essere simile al seguente:

Flights leaving BWI: 8422
Flights leaving DC Metro: 18502

Come si può notare, l'utilizzo di frame di dati è intuitiva e semplice. Il pesante per parallelizzare l'esecuzione di query sui dati viene eseguito dal motore di base di Spark. Inoltre, chiunque abbia familiarità con Hive o Pig noterà che il tempo di esecuzione in queste query notevolmente più veloce di Spark.

Creazione di un modello predittivo con SparkML

Per creare un modello predittivo con Spark ML, è necessario importare alcune librerie nel progetto. A tale scopo, creare una nuova cella e immettere il codice seguente, eseguirlo:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

Successivamente, è necessario creare un nuovo frame di dati con solo i campi: è necessario creare un modello predittivo della probabilità di un volo viene ritardato. Il codice seguente pares verso il basso le colonne originali 26 nel frame di dati e solo sei. Mentre il set di dati originale include una colonna che indica se un volo è stato ritardato 15 minuti o più, desidera aggiungere alcune maggiore granularità. Verrà se un volo viene posticipato 10 minuti o più, creare un nuovo set di colonne su 1 e nome della colonna "IsDelayed". Inoltre, è possibile chiamare la funzione dropna, che elimina tutte le righe con valori null. Gli algoritmi di Machine Learning è possibile che su ricevono gli input di dati. Molto spesso un valore null imprevisto verrà generata un'eccezione o peggio, danneggiare i risultati. Immettere il codice seguente in una nuova cella e lo esegue. I risultati mostreranno i primi 20 righe di model_data:

model_data = flights_df.select("DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID",
  "DEST_AIRPORT_ID", "DEP_DELAY", ((col("ARR_DELAY") >
  10).cast("Int").alias("IsDelayed"))).dropna()
  model_data.show()

Apprendimento supervisionato

In Machine Learning supervisionato, dati verificati sono noto. È in questo caso, i record in fase di arrivo dei voli negli Stati Uniti per mese di giugno 2015. Basati su tali dati, l'algoritmo genera un modello predittivo in indica se verrà ritardato in arrivo del volo da 10 minuti o più assegnato cinque campi: Giorno del mese, giorno della settimana, ID origine aeroporto, ID di quello di destinazione e il ritardo di partenza. In termini di Machine Learning, tali campi vengono detti collettivamente come "funzionalità". Il valore stimato, in questo caso un indicatore di un ritardo di arrivo di 10 minuti o meno di 10 minuti, è detto "label".

Modelli di Machine Learning supervisionati spesso vengono testati con stesso set di dati su cui stai training. A tale scopo, esegue in modo casuale suddiviso in due set di dati: uno per il training e uno per il testing. È in genere, suddividere lungo una linea/40, 60 o 70/30 con la condivisione di dimensioni maggiore verso i dati di training.

Il codice seguente separa i dati di training e i dati di test in due imposta lungo una linea di 70/30 e visualizza il conteggio (ancora una volta, immettere il codice in una nuova cella ed eseguirlo):

split_model_data = model_data.randomSplit([0.7,0.3])
training_data = split_model_data[0]
testing_data = split_model_data[1]
all_data_count = training_data.count() + testing_data.count()
print ("Rows in Training Data: " + str(training_data.count()))
print ("Rows in Testing Data: " + str(testing_data.count()))
print ("Total Rows: " + str(all_data_count))

Sarà presente una discrepanza nelle righe di totali visualizzate qui e versioni precedenti del blocco note. Ovvero a causa di righe con valori null in corso di eliminazione.

I dati di test ora devono essere ulteriormente modificati per soddisfare i requisiti dell'algoritmo di Machine Learning. I cinque campi che rappresentano le funzionalità verranno combinati in una matrice o un vettore, mediante un processo denominato la vettorizzazione. La colonna IsDelayed verrà rinominata all'etichetta. Il frame di dati di training sarà presenti solo due colonne: funzionalità e l'etichetta. Immettere il codice seguente in una cella vuota ed eseguita e le righe primi 20 di training che verranno visualizzati i frame di dati:

vector_assembler = VectorAssembler(
  inputCols = ["DAY_OF_MONTH", "DAY_OF_WEEK", "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
  "DEP_DELAY"], outputCol="features")
training = vector_assembler.transform(training_data).select(col("features"),
  col("IsDelayed").cast("Int").alias("label"))
  training.show(truncate=False)

Con i dati di training suddiviso in due colonne, le funzionalità e le etichette, è pronto per essere utilizzato per l'algoritmo di Machine Learning. In questo caso, ho scelto la regressione logistica. La regressione logistica è un metodo statistico per analizzare i dati in cui uno o più variabili di input influenzano un risultato. Per questo modello, le variabili di input sono il contenuto della colonna di funzionalità, i campi DAY_OF_MONTH, DAY_OF_WEEK, ORIGIN_AIRPORT_ID, DEST_AIRPORT_ID e DEP_DELAY. Il risultato è la colonna di etichetta, o se il volo è stato ritardato più di 10 minuti. Questo algoritmo non sarà possibile distinguere tra un ritardo di 10 minuti e un secondo e un ritardo di 15 ore. La creazione del modello adattando i dati di training a esso. Anche in questo caso immettere il codice seguente in una cella vuota ed eseguirlo:

logR = LogisticRegression(labelCol="label", featuresCol="features",
  maxIter=10, regParam=0.1)
model = logR.fit(training)

Con il modello sottoposto a training, il solo a questo punto è eseguirne il test. I dati di testing devono anche essere modificati per adattare le aspettative dell'algoritmo eseguendolo mediante l'assembler vettore come dati di training sono. Immettere il codice seguente in una cella vuota ed eseguirlo:

testing = vector_assembler.transform(testing_data).select(col("features"),
  (col("IsDelayed")).cast("Int").alias("trueLabel"))
  testing.show()

Ora che i dati di training vengano preparati, il passaggio successivo consiste per eseguirlo tramite il modello chiamando il metodo di trasformazione. L'output è un frame di dati con quattro colonne: le funzionalità, il valore stimato, il valore effettivo e la probabilità, una misura dell'algoritmo che è stato in base alla stima. Ancora una volta, immettere il codice seguente in una cella vuota ed eseguirlo:

prediction = model.transform(testing)
predicted = prediction.select("features", "prediction", "trueLabel", "probability")
predicted.show()

L'output mostra solo i primi 20 righe. Che non è un modo efficiente per misurare l'efficacia del modello. Il modo migliore per eseguire operazioni che consiste nel contare il numero di volte in cui che l'algoritmo stimati correttamente e il numero di volte è errato. Tuttavia, una semplice metrica "destra e non corretta" non sempre storia la completa. La metrica più indicativa è che un elemento denominato "confusione matrice" che visualizza il numero di veri e veri positivi insieme al numero di falsi positivi e falsi negativi. Immettere il codice seguente in una cella vuota ed eseguirlo per visualizzare la matrice di confusione per questo modello:

true_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '0')).count()
true_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '1')).count()
false_negatives = predicted.where( (predicted.prediction == '0.0') |
  (predicted.trueLabel == '1')).count()
false_positives = predicted.where( (predicted.prediction == '1.0') |
  (predicted.trueLabel == '0')).count()
print ( "True Positive: " + str(true_positives) )
print ( "True Negative: " + str(true_negatives) )
print ( "False Positive: " + str(false_positives) )
print ( "False Negative: " + str(false_negatives) )

I risultati non sono incoraggianti. Il modello è errato notevolmente più era a destra. Tutti non è andato perso, tuttavia. Quando si verificano errori esperimenti, la migliore linea di azione è per analizzare i risultati, modificare i parametri di modello e riprovare. Ecco perché il campo viene chiamato "analisi scientifica dei dati".

Conclusioni

Spark è un ambiente di elaborazione di cluster rapido ed efficiente per l'elaborazione parallela dei carichi di lavoro di dati con un'architettura modulare. Due moduli sono stati illustrati in questo articolo sono state PySpark e ML Spark. PySpark fornisce un fase runtime di Python per Spark e astrazione di alto livello di resilienza Distributed set di dati (RDDs) sotto forma di un'API di frame di dati. La libreria di Spark ML fornisce un'API di apprendimento automatico per i dati basati su frame di dati.

Machine learning è una disciplina con il campo più grande dell'analisi scientifica dei dati. Quando un esperimento non produce i risultati desiderati, trovare la soluzione richiede un approccio iterativo. Ad esempio 10 minuti è troppo granulari un intervallo. Potrebbe essere più di cinque campi di input consente di rilevare un modello. Forse volo dati relativi a un mese non è sufficiente per l'algoritmo stabilire un chiaro schema. L'unico modo per conoscere è per mantenere la sperimentazione, analisi dei risultati e regolare i parametri e dati di input.


Frank La Vignecomporta la pratica di dati & Analitica Wintellect e CO-host il podcast DataDriven. Blog di he periodicamente presso FranksWorld.com ed è possibile guardare lui sul suo canale YouTube, "Di Frank mondo TV" (FranksWorld.TV).

Grazie al seguente esperto tecnico per la revisione dell'articolo: Andy Leonard


Viene illustrato in questo articolo nel forum di MSDN Magazine