salut! Je veux aborder en détail un sujet intéressant, mais malheureusement pas un sujet dans la documentation Spark: comment entraîner un modèle dans PySpark ML sur un ensemble de données avec différents types de données (chaînes et nombres)? Le désir d'écrire cet article a été causé par la nécessité de naviguer sur Internet pendant plusieurs jours à la recherche de l'article nécessaire avec le code, car le tutoriel officiel de Spark fournit un exemple de travail non seulement avec des signes d'un type de données, mais en général avec un signe, mais des informations sur la façon de travailler avec plusieurs colonnes les types de données les plus différents, il n'y en a pas. Cependant, après avoir étudié en détail les capacités de PySpark pour travailler avec des données, j'ai réussi à écrire du code de travail et à comprendre comment tout se passe, ce que je souhaite partager avec vous. Alors à toute vitesse, mes amis!
Dans un premier temps, importons toutes les bibliothèques nécessaires au travail, puis nous analyserons le code en détail afin que toute "théière rouillée" qui se respecte, comme, d'ailleurs, je l'ai récemment, comprenne tout:
#
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
Créons maintenant un contexte Spark (local) et une session Spark et vérifions si tout fonctionne en l'affichant à l'écran. La création d'une session Spark est le point de départ pour travailler avec des ensembles de données dans Spark:
#
sc = SparkContext('local')
spark = SparkSession(sc)
spark
Il existe un outil pour travailler avec des données, chargeons-le maintenant. L'article utilise un ensemble de données provenant du site du concours d'apprentissage automatique Kaggle:
https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions
qui, après téléchargement, est stocké dans path_csv au format .csv et propose les options suivantes:
- en-tête: si la première ligne de notre fichier est un en-tête, alors on met "true"
- délimiteur: on met un signe qui sépare les données d'une ligne par des signes, souvent c'est "," ou ";"
- inferSchema: si true, PySpark détectera automatiquement le type de chaque colonne, sinon vous devrez l'écrire vous-même
# .csv path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
.option("header", "true")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.load(path_csv)
Pour mieux comprendre à quel type de données nous traitons, examinons quelques-unes de leurs lignes:
#
data.show()
Voyons également combien de lignes nous avons dans l'ensemble de données:
#
data.select('year').count()
Et enfin, inférons les types de nos données, que, comme nous nous en souvenons, nous avons demandé à PySpark de déterminer automatiquement en utilisant l'option ("inferSchema", "true"):
#
data.printSchema()
Passons maintenant à notre cours principal - travailler avec plusieurs signes de différents types de données. Spark peut entraîner le modèle sur les données transformées, où la colonne prédite est un vecteur et les colonnes avec des fonctionnalités sont également un vecteur, ce qui complique la tâche ... Mais nous n'abandonnons pas, et pour entraîner le modèle dans PySpark nous utiliserons Pipeline, dans lequel nous passerons un certain plan d'action (variable étapes):
- step label_stringIdx: nous transformons la colonne de l'ensemble de données de valeur que nous voulons prédire en une chaîne de vecteur Spark et la renommons pour étiqueter avec le paramètre handleInvalid = 'keep', ce qui signifie que notre colonne prédite prend en charge null
- stringIndexer étape: convertir les colonnes de chaînes en chaînes catégorielles Spark
- encoder: ()
- assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
- gbt: PySpark ML GBTRegressor,
#value - -
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]
#depend on categorical columns: country and types of emission
# :
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
#
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + 'Index',
handleInvalid = 'keep')
encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
outputCol=categoricalCol + "classVec")
stages += [stringIndexer, encoder]
# :
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
# - -
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
Divisons notre ensemble de données en échantillons d'entraînement et de test dans le rapport préféré de 70% à 30%, respectivement, et commençons à entraîner le modèle à l'aide d'un arbre de renforcement de la régression de gradient (GBTRegressor), qui devrait prédire le vecteur d'étiquette en fonction des caractéristiques précédemment combinées en un seul vecteur «caractéristiques» avec limite itérable maxIter = 10:
# (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# ( )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]
# stages
pipeline = Pipeline(stages=stages)
Et maintenant, il ne nous reste plus qu'à envoyer à l'ordinateur un plan d'action et un jeu de données de formation:
#
model = pipeline.fit(trainingData)
#
predictions = model.transform(testData)
Sauvegardons notre modèle afin que nous puissions toujours revenir à l'utiliser sans nous recycler:
#
pipeline.write().overwrite().save('model/gbtregr_model')
Et si vous décidez de recommencer à utiliser le modèle entraîné pour les prédictions, écrivez simplement:
#
load_model = pipeline.read().load('model/gbtregr_model')
Nous avons donc examiné comment dans un outil de travail avec des données volumineuses dans le langage Python, PySpark, le travail avec plusieurs colonnes de fonctionnalités de différents types de données est implémenté.
Il est maintenant temps de l'appliquer à vos modèles ...