Hey! Je m'appelle Andrey Serebryansky, je suis ingénieur de données dans l'équipe Data Operations. Notre équipe est chargée de remplir notre référentiel Snowflake, ainsi que de s'assurer que le reste des équipes dispose de données en temps réel. Par exemple, le flux des transactions (il s'agit des achats des clients, de leurs virements, des cashbacks reçus par eux) est renseigné en fonction de nos données.
Pour toutes ces tâches, nous utilisons Kafka, et surtout Kafka Streams. Aujourd'hui, je vais parler des tâches pour lesquelles Kafka Streams peut être utilisé et montrer le code de nos exemples simples. Cela sera utile pour ceux qui utilisent Kafka mais n'ont pas encore essayé Kafka Streams. Si vous souhaitez conserver l'état lors du traitement des sujets Kafka, ou si vous recherchez une syntaxe simple pour enrichir certains sujets avec des informations provenant d'autres, je vais vous montrer aujourd'hui comment vous pouvez le faire facilement et de manière pratique.
Aperçu de l'article
-
Pourquoi avons-nous besoin de Kafka Streams
Cas n°1. Enrichir les achats de nos clients avec des informations sur la marque
Cas numéro 2. Nous transférons les données client de l'équipe Origination à notre stockage
-
Un peu sur l'évolutivité de Kafka Streams
-
Un peu sur Kafka Streams
Kafka Streams - Java. Kafka Java/Scala.
exactly once processing kafka transactions.
Kafka Streams , stateful (, ).
Kafka Streams?
: , - , , , , .
, - . , , . , , , , , .
: , . , , , .

, .

, , , . . Kafka Streams. , ,

, .
№1.
, . (brand_id) ( ).

.

.
builder.streams("authorization-events")
.join(
builder.globalTable("brands"),
auth -> auth.get("brand_id"), // ,
(brand, auth) -> auth.set("brandName", brand.get("name")) //
);
builder? . :
import org.apache.kafka.streams.StreamsBuilder;
...
StreamsBuilder builder = new StreamsBuilder();
, Kafka Streams id ( , ).
id ?
Kafka Streams , , - . builder.globalTable(topicName)
.
. , , . , . , , .

, Kafka Streams .
№2. Origination
Vivid Money, , . Origination - Vivid.

Kafka Connect open-source dynamodb JSON.

, . , , . Apache AVRO. .
Avro
{
"type": "record",
"name": "OriginationClient",
"namespace": "datahub",
"fields": [
{
"name": "firstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "lastName",
"type": [
"null",
"string"
],
"default": null
},
...
]
}
, :
Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);
builder.stream("origination-json-topic")
.mapValues(val -> avroConverter.convert(val))
.to("origination-avro-topic");
AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .
, . , , , . (diff) . , .
, . . . , Kafka Streams . , , .
:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...
var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore**("state-store"**), //
Serdes.Bytes(), //
new GenericAvroSerde() //
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // ,
.to(outputTopic);
ChangeTransformer :
public class ChangeTransformer implements Transformer {
private KeyValueStore<Bytes, GenericRecord> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.stateStore = processorContext.getStateStore("state-store");
}
@Override
public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
GenericRecord prevState = stateStore.get(recordKey);
return extractDiff(prevState, record);
}
...
}
?
StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
.filter(...)
.map(...)
.to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); //
...
kafkaStreams.stop();
Kafka Streams
Kafka Streams . . 16 , 16 . , .
, state-store ( ChangeTransformer-), , ! .
: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model
Kafka Streams :
stateful (join, get previous state). , .
. map, filter, join DSL. ,
transform()
. ChangeTransformer-, .
. . .
P.S. ) , !