Test dans Apache Spark Structured Streaming

introduction



Actuellement, il n'y a pas beaucoup d'exemples de tests pour les applications basées sur Spark Structured Streaming. Par conséquent, cet article fournit des exemples de test de base avec des descriptions détaillées.







Tous les exemples utilisent: Apache Spark 3.0.1.







Préparation



Vous devez installer:







  • Apache Spark 3.0.x
  • Python 3.7 et son environnement virtuel
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • Les exemples Scala utilisent la version 2.12.10.


  1. Télécharger Apache Spark
  2. Décompresser: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Créez un environnement, par exemple, avec conda: conda create -n sp python = 3.7


Les variables d'environnement doivent être configurées. Voici un exemple d'exécution locale.







SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
      
      





Des tests



Un exemple avec scikit-learn



Lors de l'écriture de tests, vous devez séparer votre code afin de pouvoir isoler la logique et l'utilisation réelle de l'API finale. Bon exemple d'isolement: DataFrame-pandas , DataFrame-spark .







L'exemple suivant sera utilisé pour écrire des tests: LinearRegression .







Testons donc le code en utilisant le "modèle" Python suivant:







class XService:

    def __init__(self):
        # 

    def train(self, ds):
        # 

    def predict(self, ds):
        #    

      
      





Pour Scala, le modèle ressemble à ceci.







Exemple complet:







from sklearn import linear_model

class LocalService:

    def __init__(self):
        self.model = linear_model.LinearRegression()

    def train(self, ds):
        X, y = ds
        self.model.fit(X, y)

    def predict(self, ds):
        r = self.model.predict(ds)
        print(r)

      
      





Tester.







Importer:







import unittest
import numpy as np
      
      





Classe principale:







class RunTest(unittest.TestCase):
      
      





Tests en cours:







if __name__ == "__main__":
    unittest.main()
      
      





Préparation des données:







X = np.array([
    [1, 1],  # 6
    [1, 2],  # 8
    [2, 2],  # 9
    [2, 3]  # 11
])
y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
      
      





Création et formation de modèles:







service = local_service.LocalService()
service.train((X, y))
      
      





Obtenir des résultats:







service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
      
      





Répondre:







[16.]
[19.]
      
      





Ensemble:







import unittest
import numpy as np

from spark_streaming_pp import local_service

class RunTest(unittest.TestCase):
    def test_run(self):
        # Prepare data.
        X = np.array([
            [1, 1],  # 6
            [1, 2],  # 8
            [2, 2],  # 9
            [2, 3]  # 11
        ])
        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
        # Create model and train.
        service = local_service.LocalService()
        service.train((X, y))
        # Predict and results.
        service.predict(np.array([[3, 5]]))
        service.predict(np.array([[4, 6]]))
        # [16.]
        # [19.]

if __name__ == "__main__":
    unittest.main()
      
      





Un exemple avec Spark et Python



LinearRegression. , Structured Streaming DataFrame-, Spark Sql. .







:







self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
      
      





:







self.model = self.service.fit(ds)
      
      





:







transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
      
      





:







from pyspark.ml.regression import LinearRegression

class StructuredStreamingService:
    def __init__(self):
        self.service = LinearRegression(maxIter=10, regParam=0.01)
        self.model = None

    def train(self, ds):
        self.model = self.service.fit(ds)

    def predict(self, ds):
        transformed_ds = self.model.transform(ds)
        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
        return q

      
      





.







, .







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





.







, , Structured Streaming, .. DataFrame , DataFrame.

, Spark.







def test_stream_read_options_overwrite(self):
    bad_schema = StructType([StructField("test", IntegerType(), False)])
    schema = StructType([StructField("data", StringType(), False)])
    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
        .schema(bad_schema)\
        .load(path='python/test_support/sql/streaming', schema=schema, format='text')
    self.assertTrue(df.isStreaming)
    self.assertEqual(df.schema.simpleString(), "struct<data:string>")
      
      





.







:







spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
      
      





( ):







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





:







service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
      
      





. : . 3 .







def extract_features(x):
    values = x.split(",")
    features_ = []
    for i in values[1:]:
        features_.append(float(i))
    features = Vectors.dense(features_)
    return features

extract_features_udf = udf(extract_features, VectorUDT())

def extract_label(x):
    values = x.split(",")
    label = float(values[0])
    return label

extract_label_udf = udf(extract_label, FloatType())

predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
    .withColumn("features", extract_features_udf(col("value"))) \
    .withColumn("label", extract_label_udf(col("value")))

service.predict(predict_ds).awaitTermination(3)
      
      





:







15.96699
18.96138
      
      





:







import unittest
import warnings

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT

from spark_streaming_pp import structure_streaming_service

class RunTest(unittest.TestCase):
    def test_run(self):
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        # Prepare data.
        train_ds = spark.createDataFrame([
            (6.0, Vectors.dense([1.0, 1.0])),
            (8.0, Vectors.dense([1.0, 2.0])),
            (9.0, Vectors.dense([2.0, 2.0])),
            (11.0, Vectors.dense([2.0, 3.0]))
        ],
            ["label", "features"]
        )
        # Create model and train.
        service = structure_streaming_service.StructuredStreamingService()
        service.train(train_ds)

        # Predict and results.

        def extract_features(x):
            values = x.split(",")
            features_ = []
            for i in values[1:]:
                features_.append(float(i))
            features = Vectors.dense(features_)
            return features

        extract_features_udf = udf(extract_features, VectorUDT())

        def extract_label(x):
            values = x.split(",")
            label = float(values[0])
            return label

        extract_label_udf = udf(extract_label, FloatType())

        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
            .withColumn("features", extract_features_udf(col("value"))) \
            .withColumn("label", extract_label_udf(col("value")))

        service.predict(predict_ds).awaitTermination(3)

        # +-----+------------------+
        # |label|        prediction|
        # +-----+------------------+
        # |  1.0|15.966990887541273|
        # |  2.0|18.961384020443553|
        # +-----+------------------+

    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)

if __name__ == "__main__":
    unittest.main()
      
      





, Scala .

:







implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
      
      





Scala (, , sql):







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery

class StructuredStreamingService {
  var service: LinearRegression = _
  var model: LinearRegressionModel = _

  def train(ds: DataFrame): Unit = {
    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
    model = service.fit(ds)
  }

  def predict(ds: DataFrame): StreamingQuery = {

    val m = ds.sparkSession.sparkContext.broadcast(model)

    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
      m.value.predict(features)
    }

    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun

    val toUpperUdf = udf(transform)

    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))

    predictionDs
      .writeStream
      .foreachBatch((r: DataFrame, i: Long) => {
        r.show()
        // scalastyle:off println
        println(s"$i")
        // scalastyle:on println
      })
      .start()
  }
}
      
      





:







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}

class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
  test("run") { spark =>
    // Prepare data.
    val trainDs = spark.createDataFrame(Seq(
      (6.0, Vectors.dense(1.0, 1.0)),
      (8.0, Vectors.dense(1.0, 2.0)),
      (9.0, Vectors.dense(2.0, 2.0)),
      (11.0, Vectors.dense(2.0, 3.0))
    )).toDF("label", "features")
    // Create model and train.
    val service = new StructuredStreamingService()
    service.train(trainDs)
    // Predict and results.
    implicit val sqlCtx = spark.sqlContext
    import spark.implicits._
    val source = MemoryStream[Record]
    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
    val predictDs = source.toDF()
    service.predict(predictDs).awaitTermination(2000)
    // +-----+---------+------------------+
    // |label| features|        prediction|
    // +-----+---------+------------------+
    // |  1.0|[3.0,5.0]|15.966990887541273|
    // |  2.0|[4.0,6.0]|18.961384020443553|
    // +-----+---------+------------------+
  }

  override protected def withFixture(test: OneArgTest): Outcome = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()

    try withFixture(test.toNoArgTest(spark))

    finally spark.stop()
  }

  override type FixtureParam = SparkSession

  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)

}
      
      







, API. . kafka.







“DataFrame” .







Python .












All Articles