Schéma SparkEvolution en pratique

Chers lecteurs, bonne journée!



Dans cet article, le principal consultant de la ligne métier Big Data Solutions chez Neoflex, décrit en détail les options de création de vitrines à structure variable à l'aide d'Apache Spark.



Dans le cadre d'un projet d'analyse de données, la tâche de construire des marchés à partir de données peu structurées se pose souvent.



Il s'agit généralement de journaux ou de réponses de divers systèmes, enregistrés au format JSON ou XML. Les données sont téléchargées sur Hadoop, vous devez ensuite créer une vitrine à partir d'elles. Nous pouvons organiser l'accès à la vitrine créée, par exemple, via Impala.



Dans ce cas, la disposition de la vitrine cible est auparavant inconnue. De plus, le schéma ne peut toujours pas être établi à l'avance, car il dépend des données, et nous avons affaire à ces données très faiblement structurées.



Par exemple, aujourd'hui, la réponse suivante est enregistrée:



{source: "app1", error_code: ""}


et demain la réponse suivante vient du même système:



{source: "app1", error_code: "error", description: "Network error"}


En conséquence, un autre champ doit être ajouté à la vitrine - la description, et personne ne sait s'il viendra ou non.



La tâche de créer un magasin sur de telles données est assez standard, et Spark dispose d'un certain nombre d'outils pour cela. JSON et XML sont pris en charge pour l'analyse des données brutes, et la prise en charge de schemaEvolution est fournie pour un schéma précédemment inconnu.



À première vue, la solution semble simple. Nous devons prendre un dossier avec JSON et le lire dans un dataframe. Spark créera un schéma et transformera les données imbriquées en structures. Ensuite, tout doit être sauvegardé dans le parquet, qui est également pris en charge dans Impala, en enregistrant la vitrine dans le métastore Hive.



Tout semble simple.



Cependant, les brefs exemples de la documentation ne permettent pas de savoir quoi faire avec un certain nombre de problèmes dans la pratique.



La documentation décrit une approche non pas pour créer une vitrine, mais pour lire JSON ou XML dans un dataframe.



À savoir, il est simplement indiqué comment lire et analyser JSON:



df = spark.read.json(path...)


Cela suffit pour rendre les données disponibles à Spark.



En pratique, le scénario est beaucoup plus compliqué que la simple lecture de fichiers JSON à partir d'un dossier et la création d'un dataframe. La situation ressemble à ceci: il y a déjà une certaine vitrine, de nouvelles données arrivent chaque jour, il faut les ajouter à la vitrine, sans oublier que le schéma peut être différent.



Le schéma habituel de création d'une vitrine est le suivant:



Étape 1. Les données sont chargées dans Hadoop, suivies d'un rechargement quotidien et ajoutées à une nouvelle partition. Il s'avère que le dossier avec les données initiales est partitionné par jours.



Étape 2.Lors du chargement d'initialisation, ce dossier est lu et analysé par Spark. La trame de données résultante est enregistrée dans un format disponible pour l'analyse, par exemple dans le parquet, qui peut ensuite être importée dans Impala. Cela crée une vitrine cible avec toutes les données qui se sont accumulées jusqu'à présent.



Étape 3. Un téléchargement est créé qui mettra à jour la vitrine tous les jours.

Se pose la question du chargement incrémental, la nécessité de partitionner la vitrine, et la question de la prise en charge du schéma général de la vitrine.



Donnons un exemple. Disons que la première étape de création du stockage est mise en œuvre et que l'exportation des fichiers JSON vers un dossier est configurée.



Ce n'est pas un problème de créer un dataframe à partir d'eux, puis de l'enregistrer en tant que vitrine. C'est la toute première étape que vous pouvez facilement trouver dans la documentation Spark:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


Tout semble aller bien.



Nous avons lu et analysé le JSON, puis nous sauvegardons le dataframe en tant que parquet, en l'enregistrant auprès de Hive de n'importe quelle manière pratique:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


Nous avons une vitrine.



Mais, le lendemain, de nouvelles données de la source ont été ajoutées. Nous avons un dossier avec JSON et une vitrine créée à partir de ce dossier. Après avoir chargé le segment de données suivant à partir de la source, le magasin de données manque de données pendant un jour.



Une solution logique serait de partitionner la vitrine par jour, ce qui permettra d'ajouter une nouvelle partition chaque jour suivant. Le mécanisme pour cela est également bien connu, Spark vous permet d'écrire des partitions séparément.



Tout d'abord, nous effectuons le chargement d'initialisation, en sauvegardant les données comme décrit ci-dessus, en ajoutant uniquement le partitionnement. Cette action est appelée initialisation de la vitrine et n'est effectuée qu'une seule fois:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


Le lendemain, nous ne chargeons qu'une nouvelle partition:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


Il ne reste plus qu'à se réinscrire auprès de Hive pour mettre à jour le schéma.

Cependant, c'est là que les problèmes surgissent.



Premier problème. Tôt ou tard, le parquet résultant ne peut pas être lu. Cela a à voir avec la façon dont le parquet et le JSON abordent différemment les champs vides.



Prenons une situation typique. Par exemple, JSON arrive hier:



 1: {"a": {"b": 1}},


et aujourd'hui, le même JSON ressemble à ceci:



 2: {"a": null}


Disons que nous avons deux partitions différentes avec une ligne chacune.

Lorsque nous lirons l'intégralité des données source, Spark sera en mesure de déterminer le type et de comprendre que "a" est un champ de type "structure", avec un champ imbriqué "b" de type INT. Mais, si chaque partition a été enregistrée séparément, un parquet avec des schémas de partition incompatibles est obtenu:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


Cette situation est bien connue, donc une option a été spécialement ajoutée pour supprimer les champs vides lors de l'analyse des données initiales:



df = spark.read.json("...", dropFieldIfAllNull=True)


Dans ce cas, le parquet sera composé de cloisons pouvant être lues ensemble.

Bien que ceux qui ont fait cela dans la pratique riront amèrement. Pourquoi? Parce que deux autres situations sont susceptibles de se produire. Ou trois. Ou quatre. Le premier, qui apparaîtra presque certainement, est que les types numériques auront un aspect différent dans différents fichiers JSON. Par exemple, {intField: 1} et {intField: 1.1}. Si de tels champs sont trouvés dans une partie, la fusion de schéma lira tout correctement, conduisant au type le plus précis. Mais si dans différent, alors l'un aura intField: int, et l'autre intField: double.



Il existe l'indicateur suivant pour gérer cette situation:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


Nous avons maintenant un dossier dans lequel se trouvent les partitions, qui peuvent être lues dans une seule trame de données et un parquet valide pour toute la vitrine. Oui? Non.



N'oubliez pas que nous avons enregistré la table dans Hive. Hive n'est pas sensible à la casse dans les noms de champ, tandis que parquet est sensible à la casse. Par conséquent, les partitions avec les schémas: field1: int et Field1: int sont les mêmes pour Hive, mais pas pour Spark. N'oubliez pas de mettre les noms des champs en minuscules.



Après cela, tout semble aller bien.



Cependant, pas si simple. Un deuxième problème, également bien connu, se pose. Étant donné que chaque nouvelle partition est enregistrée séparément, les fichiers du service Spark seront dans le dossier de partition, par exemple, l'indicateur de réussite de l'opération _SUCCESS. Cela jettera une erreur lors de la tentative de parquet. Pour éviter cela, vous devez configurer la configuration en désactivant Spark d'ajouter des fichiers de service au dossier:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


Il semble que maintenant, chaque jour, une nouvelle partition de parquet est ajoutée au dossier de la vitrine cible, où les données analysées pour la journée sont stockées. Nous avons pris soin à l'avance qu'il n'y ait pas de partitions avec un conflit de type de données.



Mais, devant nous, se trouve le troisième problème. Maintenant, le schéma général n'est pas connu, de plus, dans Hive, la table avec le mauvais schéma, car chaque nouvelle partition, très probablement, a introduit une distorsion dans le schéma.



Vous devez réenregistrer la table. Cela peut être fait simplement: relisez le parquet de la vitrine, prenez le schéma et créez un DDL basé sur celui-ci, avec lequel réenregistrez le dossier dans Hive en tant que table externe, en mettant à jour le schéma de la vitrine cible.



Nous sommes confrontés à un quatrième problème. La première fois que nous avons enregistré la table, nous nous sommes appuyés sur Spark. Maintenant, nous le faisons nous-mêmes et vous devez vous rappeler que les champs de parquet peuvent commencer par des caractères qui ne sont pas valides pour Hive. Par exemple, Spark jette des lignes qu'il n'a pas pu analyser dans le champ "corrupt_record". Un tel champ ne peut pas être inscrit auprès de Hive sans échapper.



Sachant cela, nous obtenons le schéma:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


Code ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Replace ("<", "<` "). Replace (", " , ",` "). replace (" array <`", "array <") rend DDL sûr, c'est-à-dire au lieu de:



create table tname (_field1 string, 1field string)


Avec des noms de champ tels que "_field1, 1field", un DDL sécurisé est créé où les noms de champ sont échappés: create table `tname` (chaîne` _field1`, chaîne `1field`).



La question se pose: comment obtenir correctement le dataframe avec le schéma complet (en code pf)? Comment puis-je obtenir ce pf? C'est le cinquième problème. Relire le schéma de toutes les partitions du dossier avec les fichiers parquet de la vitrine cible? C'est la méthode la plus sûre, mais la plus difficile.



Le schéma est déjà dans Hive. Vous pouvez obtenir un nouveau schéma en combinant le schéma de la table entière et la nouvelle partition. Vous devez donc prendre le schéma de table de Hive et le combiner avec le nouveau schéma de partition. Cela peut être fait en lisant les métadonnées de test de Hive, en les enregistrant dans un dossier temporaire et en lisant les deux partitions avec Spark en même temps.



En gros, vous avez tout ce dont vous avez besoin: le schéma de table d'origine dans Hive et une nouvelle partition. Nous avons également les données. Il ne reste plus qu'à obtenir un nouveau schéma qui combine le schéma de la vitrine et les nouveaux champs de la partition créée:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


Ensuite, nous créons le DDL pour enregistrer la table, comme dans l'extrait de code précédent.

Si toute la chaîne fonctionne correctement, c'est-à-dire qu'il y avait une charge d'initialisation et que dans Hive il y a une table correctement créée, nous obtenons un schéma de table mis à jour.



Et le dernier problème est que vous ne pouvez pas simplement ajouter une partition à la table Hive, car elle sera cassée. Vous devez forcer Hive à corriger la structure de la partition:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


La simple tâche de lire JSON et de créer une vitrine à partir de celui-ci se traduit par la résolution d'un certain nombre de difficultés implicites, pour lesquelles vous devez rechercher des solutions séparément. Bien que ces solutions soient simples, elles prennent beaucoup de temps à trouver.



Pour mettre en œuvre la construction de la vitrine, j'ai dû:



  • Ajouter des partitions à la vitrine, en supprimant les fichiers de service
  • Traiter les champs vides dans les données d'origine que Spark a tapées
  • Caster des types simples en chaîne
  • Convertir les noms de champs en minuscules
  • Vidage de données séparé et enregistrement de table dans Hive (création DDL)
  • N'oubliez pas d'échapper les noms de champ, qui peuvent ne pas être compatibles avec Hive
  • Apprenez à mettre à jour l'enregistrement d'une table dans Hive


En résumé, on constate que la décision de construire des vitrines cache de nombreux écueils. Par conséquent, si des difficultés de mise en œuvre surviennent, il est préférable de contacter un partenaire expérimenté avec une expertise réussie.



Merci d'avoir lu cet article, nous espérons que vous trouverez l'information utile.



All Articles