Le streaming structuré a été introduit pour la première fois dans Apache Spark 2.0. Cette plate-forme s'est imposée comme le meilleur choix pour créer des applications de streaming distribuées. L'unification de l'API SQL / Dataset / DataFrame et des fonctions Spark intégrées permet aux développeurs de mettre en œuvre beaucoup plus facilement leurs besoins essentiels complexes tels que l'agrégation de flux, la jointure flux-flux et la prise en charge du fenêtrage. Depuis la sortie de Structured Streaming, les développeurs ont demandé à améliorer le contrôle du streaming, tout comme nous l'avons fait dans Spark Streaming (comme DStream). Dans Apache Spark 3.0, nous avons publié une nouvelle interface utilisateur pour le streaming structuré.
La nouvelle interface utilisateur de streaming structuré offre un moyen simple de surveiller toutes les tâches de streaming avec des informations et des statistiques exploitables, ce qui facilite la résolution des problèmes lors du débogage et améliore la visibilité de la production avec des mesures en temps réel. L'interface utilisateur présente deux ensembles de statistiques: 1) des informations agrégées sur un travail de requête en continu et 2) des informations statistiques détaillées sur les demandes en continu, y compris le taux d'entrée, le taux de traitement, les lignes d'entrée, la durée du lot, la durée de l'opération, etc.
Informations agrégées sur les tâches de requête en streaming
Lorsqu'un développeur soumet une requête SQL de streaming, elle apparaît dans l'onglet Streaming structuré, qui comprend à la fois les requêtes de streaming actives et les requêtes terminées. Le tableau des résultats fournira des informations de base concernant les demandes de diffusion en continu, y compris le nom de la demande, son statut, son ID, son ID d'exécution, l'heure d'envoi, la durée de la demande, l'ID du dernier paquet, ainsi que des informations agrégées telles que le taux de réception moyen et le taux de traitement moyen. Il existe trois types d'état de demande de diffusion en continu: EN COURS D'EXÉCUTION, FINI et ÉCHEC. Toutes les demandes FINISHED et FAILED sont répertoriées dans le tableau des demandes de streaming terminées. La colonne Erreur affiche les détails de l'exception de demande ayant échoué.
Nous pouvons voir les statistiques détaillées de la demande de streaming en cliquant sur le lien Run ID.
Informations statistiques détaillées
La page Statistiques affiche des métriques, notamment le taux d'ingestion / de traitement, la latence et la durée d'opération détaillée, qui sont utiles pour comprendre l'état de vos demandes de streaming, ce qui facilite le débogage des anomalies dans le traitement des demandes.
Il contient les métriques suivantes:
- Taux d'entrée : taux agrégé (pour toutes les sources) d'arrivée des données.
- Taux de traitement : taux agrégé (pour toutes les sources) auquel Spark traite les données.
- Durée du lot : la durée de chaque lot.
- Durée de l'opération : le temps nécessaire pour effectuer diverses opérations en millisecondes.
Les transactions surveillées sont répertoriées ci-dessous:
addBatch
: temps passé à lire les données d'entrée du micro lot à partir des sources, à les traiter et à écrire les données de sortie du lot à synchroniser. Cela prend généralement la majeure partie du temps du micro-lot.getBatch
: temps nécessaire pour préparer une requête logique de lecture des données d'entrée du micropackage actuel à partir des sources.getOffset
: temps passé à demander aux sources si elles ont de nouveaux intrants.walCommit
: Écrit les décalages dans les journaux de métadonnées.queryPlanning
: Créez un plan d'exécution.
Il convient de noter que toutes les opérations répertoriées ne seront pas affichées dans l'interface utilisateur. Il existe différentes opérations avec différents types de sources de données, de sorte que certaines des opérations répertoriées peuvent être effectuées dans une seule demande de streaming.
DĂ©pannage des performances de streaming Ă l'aide de l'interface utilisateur
Dans cette section, nous examinerons quelques cas où la nouvelle interface utilisateur en streaming structuré indique que quelque chose d'extraordinaire se produit. Une demande de démonstration de haut niveau ressemble à ceci, et dans chaque cas, nous supposerons quelques conditions préalables:
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
Latence accrue en raison d'une puissance de traitement insuffisante
Dans le premier cas, nous exécutons une requête pour traiter les données Apache Kafka dès que possible. Pour chaque lot, une tâche de streaming traite toutes les données disponibles dans Kafka. Si la puissance de traitement est insuffisante pour traiter les données en rafale, la latence augmentera rapidement. Le jugement le plus intuitif est que les lignes d'entrée et la durée du lot augmenteront de manière linéaire. Le paramètre Input Rows spécifie que la tâche de streaming peut traiter un maximum de 8 000 écritures par seconde. Mais le taux d'entrée actuel est d'environ 20 000 enregistrements par seconde. Nous pouvons fournir au travail de threading plus de ressources à exécuter, ou nous pouvons ajouter suffisamment de partitions pour gérer tous les consommateurs nécessaires pour suivre les producteurs.
Stable mais latence élevée
En quoi ce cas est-il différent du précédent? La latence n'augmente pas, mais reste stable, comme le montre la capture d'écran suivante:
Nous avons constaté que le taux de traitement peut rester stable au même taux d'entrée. Cela signifie que la puissance de traitement du travail est suffisante pour traiter les données d'entrée. Cependant, le temps de traitement pour chaque lot, c'est-à -dire le délai, est toujours de 20 secondes. La principale raison de la latence élevée est trop de données dans chaque lot. On peut généralement réduire la latence en augmentant le parallélisme de ce travail. Après avoir ajouté 10 partitions Kafka et 10 cœurs supplémentaires pour les tâches Spark, nous avons constaté que la latence était d'environ 5 secondes - bien meilleure que 20 secondes.
Utilisez un graphique de durée d'opération pour le dépannage
Le graphique Durée de l'opération affiche le temps passé à effectuer diverses opérations en millisecondes. Ceci est utile pour comprendre la synchronisation de chaque lot et faciliter le dépannage. Utilisons le travail d'amélioration des performances " SPARK-30915 : Évitez de lire le fichier journal des métadonnées lorsque vous recherchez le dernier ID de lot" dans la communauté Apache Spark à titre d'exemple.
Avant cette amélioration, chaque lot suivant après compression prenait plus de temps que les autres lots, lorsque le journal de métadonnées compressé devenait énorme.
Après avoir examiné le code, une lecture inutile du fichier journal compressé a été trouvée et corrigée. Le diagramme de durée de fonctionnement suivant confirme l'effet attendu:
Projets pour le futur
Comme indiqué ci-dessus, la nouvelle interface utilisateur de streaming structuré aidera les développeurs à mieux contrôler leurs travaux de streaming en disposant d'informations beaucoup plus utiles sur les demandes de streaming. En tant que première version, la nouvelle interface utilisateur est toujours en développement et sera améliorée dans les prochaines versions. Plusieurs fonctionnalités peuvent être mises en œuvre dans un avenir pas trop lointain, y compris, mais sans s'y limiter, les suivantes:
- En savoir plus sur l'exécution d'une demande de diffusion en continu: données tardives, filigranes, mesures d'état des données, etc.
- Prise en charge de l'interface utilisateur de streaming structuré sur Spark History Server.
- Des indices plus visibles de comportement inhabituel: latence, etc.
Essayez une nouvelle interface utilisateur
Essayez cette nouvelle interface utilisateur Spark Streaming dans Apache Spark 3.0 dans le nouveau Databricks Runtime 7.1. Si vous utilisez des blocs-notes Databricks, cela vous donnera également un moyen facile d'observer l'état de toute demande de streaming dans le bloc-notes et de gérer vos demandes . Vous pouvez vous inscrire pour un compte Databricks gratuit et commencer en quelques minutes gratuitement, sans aucune information de crédit.
La qualité des données dans DWH est la cohérence de l'entrepôt de données. webinaire gratuit.
Lecture recommandée:
Data Build Tool, ou ce que l'entrepôt de données et le smoothie ont
en commun Delta Lake Dive: Schema Enforcement and Evolution
High Speed ​​Apache Parquet en Python avec Apache Arrow