Bonjour à tous!
Dans un article récent, nous avons discuté de la façon dont nous avons construit notre plate-forme de données. Aujourd'hui, j'aimerais plonger plus profondément dans le « ventre » de notre plate-forme et, en cours de route, vous expliquer comment nous avons résolu l'un des problèmes liés à la variété croissante des sources de données intégrées.
C'est-à-dire que si nous revenons à l'image finale de l'article ci-dessus (je la duplique spécialement pour la rendre plus pratique pour les chers lecteurs), nous parlerons aujourd'hui plus en détail de la mise en œuvre du "côté droit" du schéma - le celui qui se trouve après Apache NiFi.
Pour rappel, notre société dispose de plus de 350 bases de données relationnelles. Naturellement, tous ne sont pas "uniques" et beaucoup sont essentiellement des copies différentes du même système installé dans tous les magasins du réseau commercial, mais il existe toujours un "zoo de diversité". Par conséquent, on ne peut se passer d'un Framework qui simplifie et accélère l'intégration des sources dans la Data Platform.
Le schéma général de transmission des données des sources à la couche ODS de Greenplum à l'aide du cadre que nous avons développé est présenté ci-dessous :
- Kafka AVRO-, Apache NiFi, parquet S3.
«» Spark’ :
Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;
Parsing – , . , ( gzip) CSV- S3.
– CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum
Airflow.
DAG’ Airflow . Parsing . , , :
ODS- - ;
Git YAML-:
( : , , S3-, , email ..);
ODS ( , , ODS- ). , ;
, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.
Compaction. , «» , :
df = spark.read.format(in_format) \
.options(**in_options) \
.load(path) \
.distinct()
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».
-, , , S3. :
JSON- DataFrame , JSON-.
. , :
file1:
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).
:
#
df = spark.read \
.format("csv") \
.option("sep", "\a") \
.load("file1.json")
# DataFrame
df.printSchema()
root
|-- _c0: string (nullable = true)
#
df.show()
+--------------------+
| _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :
# parquet
in_format = "parquet"
in_options = {}
# JSON
in_format = "csv"
in_options = {"sep": "\a"}
DataFrame S3 :
df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
# JSON
out_format = "text"
out_options = {"compression": "gzip"}
# parquet
out_format = input_format
out_options = {"compression": "snappy"}
Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .
. , :
file1 ( ):
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
file2:
{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
Spark’ DataFrame:
df = spark.read \
.format("json") \
.option("multiline", "false") \
.load(path)
df.printSchema()
df.show()
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- width: long (nullable = true)
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
+--------------+-----+---------+-------------+--------------+
| dimensions|price|productId| productName| tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- package: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- width: long (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
+--------------------+---------+-------------+
| dimensions|productId| productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...| 3|ProductName 3|
+--------------------+---------+-------------+
, Spark . - , , DataFrame null ( price productName ).
, , ( ) ,
root
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.
, , , framework’, - JSON’ – JSON- S3.
, JSON- JSON- . JSON’ - , DataFrame , null:
df = spark.read \
.format("json") \
.option("multiline","false") \
.schema(df_schema) \
.load(path)
- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).
, :
Kafka Schema Registry
pyspark.sql.types.StructType – - :
# 1. Kafka Schema Registry REST API
# 2. schema :
df_schema = StructType.fromJson(schema)
JSON-
, … JSON-, Spark’. JSON file2 . JSON , :
df.schema.json()
{
"fields":
[
{
"metadata": {},
"name": "dimensions",
"nullable": true,
"type":
{
"fields":
[
{"metadata":{},"name":"height","nullable":true,"type":"double"},
{"metadata":{},"name":"length","nullable":true,"type":"long"},
{"metadata":{},"name":"width","nullable":true,"type":"long"}
],
"type": "struct"
}
},
{
"metadata": {},
"name": "price",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "productId",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "productName",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "tags",
"nullable": true,
"type":
{
"containsNull": true,
"elementType": "string",
"type": "array"
}
}
],
"type": "struct"
}
, JSON-.
« , JSON- , Spark’» - … , , , :
DataFrame JSON,
https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …
, SchemaConverter. – . , «» - .
, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:
, :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) | |-- createdBy: string (nullable = true) | |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) | |-- updatedBy: string (nullable = true) | |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) | |-- selected: string (nullable = true) | |-- available: array (nullable = true) | | |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) | |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lineId: string (nullable = true) | | |-- extOrderLineId: string (nullable = true) | | |-- productId: string (nullable = true) | | |-- lineStatus: string (nullable = true) | | |-- lineControlStatus: string (nullable = true) | | |-- orderedQuantity: double (nullable = true) | | |-- confirmedQuantity: double (nullable = true) | | |-- assignedQuantity: double (nullable = true) | | |-- pickedQuantity: double (nullable = true) | | |-- controlledQuantity: double (nullable = true) | | |-- allowedForGiveAwayQuantity: double (nullable = true) | | |-- givenAwayQuantity: double (nullable = true) | | |-- returnedQuantity: double (nullable = true) | | |-- sellingScheme: string (nullable = true) | | |-- stockSource: string (nullable = true) | | |-- productPrice: double (nullable = true) | | |-- lineAmount: double (nullable = true) | | |-- currency: string (nullable = true) | | |-- markingFlag: string (nullable = true) | | |-- operations: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- operationId: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- reason: string (nullable = true) | | | | |-- quantity: double (nullable = true) | | | | |-- dmCodes: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | | | |-- timeStamp: string (nullable = true) | | | | |-- updatedBy: string (nullable = true) | | |-- source: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- type: string (nullable = true) | | | | |-- items: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- objectType: string (nullable = true) | | |-- objectId: string (nullable = true) | | |-- objectStatus: string (nullable = true) | | |-- objectLines: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- objectLineId: string (nullable = true) | | | | |-- taskLineId: string (nullable = true)
, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».
SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType
, , Open Source, , , , Open Source . . Open Source , , !
SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:
# JSON
df = spark.read.format(in_format)\
.option("multiline", "false")\
.schema(json_schema) \
.load(path)
# parquet:
df = spark.read.format(in_format)\
.load(path)
, DataFrame’ CSV-.
framework’ Data Platform JSON- . :
4 JSON-!
« » framework’, , «» .