Comment utilisons-nous Kafka Streams dans l'équipe d'entrepôt de données Vivid Money ?

Hey! Je m'appelle Andrey Serebryansky, je suis ingénieur de données dans l'équipe Data Operations. Notre équipe est chargée de remplir notre référentiel Snowflake, ainsi que de s'assurer que le reste des équipes dispose de données en temps réel. Par exemple, le flux des transactions (il s'agit des achats des clients, de leurs virements, des cashbacks reçus par eux) est renseigné en fonction de nos données.





Pour toutes ces tâches, nous utilisons Kafka, et surtout Kafka Streams. Aujourd'hui, je vais parler des tâches pour lesquelles Kafka Streams peut être utilisé et montrer le code de nos exemples simples. Cela sera utile pour ceux qui utilisent Kafka mais n'ont pas encore essayé Kafka Streams. Si vous souhaitez conserver l'état lors du traitement des sujets Kafka, ou si vous recherchez une syntaxe simple pour enrichir certains sujets avec des informations provenant d'autres, je vais vous montrer aujourd'hui comment vous pouvez le faire facilement et de manière pratique.





Aperçu de l'article

  1. Un peu sur Kafka Streams





  2. Pourquoi avons-nous besoin de Kafka Streams





  3. Cas n°1. Enrichir les achats de nos clients avec des informations sur la marque





  4. Cas numéro 2. Nous transférons les données client de l'équipe Origination à notre stockage





  5. Comment commencer tout cela ?





  6. Un peu sur l'évolutivité de Kafka Streams





  7. conclusions





Un peu sur Kafka Streams

Kafka Streams - Java. Kafka Java/Scala.





exactly once processing kafka transactions.





Kafka Streams , stateful (, ).





Kafka Streams?

: , - , , , , .





, - . , , . , , , , , .





: , . , , , .





Nous récupérons séquentiellement les données de différentes sources, en attendant si quelque chose ne va pas dans la source.
, , -

, .





Trop d'amis

, , , . . Kafka Streams. , ,





Kafka Streams récupère les données nécessaires à l'avance
Kafka Streams

, .





№1.

, . (brand_id) ( ).





Les meilleures marques

.





Sujet d'autorisation

.









builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // ,      
        (brand, auth) -> auth.set("brandName", brand.get("name")) //  
    );

      
      



builder? . :





import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

      
      



, Kafka Streams id ( , ).





id ?

Kafka Streams , , - . builder.globalTable(topicName)



.





. , , . , . , , .





https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

, Kafka Streams .





№2. Origination

Vivid Money, , . Origination - Vivid.





Les informations sur le prénom et le nom sont transférées dans la base de données de l'équipe Origination
Origination

Kafka Connect open-source dynamodb JSON.





Nous prenons les données de dynamodb à notre kafka
dynamodb

, . , , . Apache AVRO. .





Avro
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

      
      



, :





Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

      
      



AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .





, . , , , . (diff) . , .





, . . . , Kafka Streams . , , .





:





import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), //                
  Serdes.Bytes(), //       
    new GenericAvroSerde() //       
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") //  ,  
    .to(outputTopic);

      
      



ChangeTransformer :





public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

      
      



?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // 
...
kafkaStreams.stop();

      
      



Kafka Streams

Kafka Streams . . 16 , 16 . , .





, state-store ( ChangeTransformer-), , ! .





: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model





Kafka Streams :





  • stateful (join, get previous state). , .





  • . map, filter, join DSL. , transform()



    . ChangeTransformer-, .





  • . . .





P.S. ) , !








All Articles