Gestion des attributs d'entité dans Apache Kafka

introduction



Lorsque vous travaillez sur des problèmes d'apprentissage automatique avec des données en ligne, il est nécessaire de rassembler diverses entités en une seule pour une analyse et une évaluation plus poussées. Le processus de collecte doit être pratique et rapide. Il devrait aussi souvent permettre une transition sans heurt du développement à l'utilisation industrielle sans effort supplémentaire ni travail de routine. Vous pouvez utiliser l'approche du magasin de fonctionnalités pour résoudre ce problème. Cette approche est décrite dans de nombreux détails ici: Meet Michelangelo: Uber's Machine Learning Platform . Cet article décrit comment interpréter la solution de gestion des fonctionnalités spécifiée en tant que prototype.







Feature Store pour le streaming en ligne



Feature Store peut être considéré comme un service qui doit exécuter ses fonctions strictement selon ses spécifications. Avant de définir cette spécification, un exemple simple doit être démonté.







Exemple



Laissez les entités suivantes être données.







Un film qui a un identifiant et un titre.







Classement des films, qui possède également son propre identifiant, son identifiant de film et sa valeur de notation La cote change avec le temps.







Source de notation, qui a également sa propre note. Et cela change avec le temps.

Et vous devez combiner ces entités en une seule.







Voici ce qui se passe.







image

Diagramme d'entité







Comme vous pouvez le voir, la fusion est basée sur des clés d'entité. Ceux. toutes les classifications de films sont recherchées pour un film, et toutes les classifications de sources pour une classification de film.







Généralisation de l'exemple



, .







kafka-, : A, B… NN.

: AB, BCD… NM.

: Feature Stream Engine.







Feature Stream Engine kafka-, Feature Stream Store Feature Stream Center, .







image

Feature Stream Engine







Feature Stream Store



, .







– (feature).







, , .







.







Feature Stream Center



, , .







Feature Stream Engine



Feature Stream Engine , .







image

Feature Stream Engine







Feature Stream Engine



Feature Stream Engine , .







Feature Stream Engine .







.

kafka.

.

( ).

, .







image

Feature Stream Engine









.







.







, ("configration.properties").







.







topic- kafka. “,”.

. “,”.

topic-.







, .







public static FeaturesDescriptor createFromProperties(Properties properties) {
    String sources = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_SOURCES);
    String keys = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_KEYS);
    String sinkSource = properties.getProperty(FEATURES_DESCRIPTOR_SINK_SOURCE);
    String[] sourcesArray = sources.split(",");
    String[] keysArray = keys.split(",");
    List<FeatureDescriptor> featureDescriptors = new ArrayList<>();
    for (int i = 0; i < sourcesArray.length; i++) {
        FeatureDescriptor featureDescriptor =
                new FeatureDescriptor(sourcesArray[i], keysArray[i]);
        featureDescriptors.add(featureDescriptor);
    }
    return new FeaturesDescriptor(featureDescriptors, sinkSource);
}
      
      





public static class FeatureDescriptor {
    public final String source;
    public final String key;

    public FeatureDescriptor(String source, String key) {
        this.source = source;
        this.key = key;
    }
}
      
      





public static class FeaturesDescriptor {
    public final List<FeatureDescriptor> featureDescriptors;
    public final String sinkSource;

    public FeaturesDescriptor(List<FeatureDescriptor> featureDescriptors, String sinkSource) {
        this.featureDescriptors = featureDescriptors;
        this.sinkSource = sinkSource;
    }
}
      
      





.







void buildStreams(StreamsBuilder builder)
      
      





topic-, , , .







Serde<String> stringSerde = Serdes.String();

List<KStream<String, String>> streams = new ArrayList<>();

for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
    KStream<String, String> stream =
            builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                    .map(new KeyValueMapperSimple(featureDescriptor.key));
    streams.add(stream);
}
      
      





.







KStream<String, String> pref = streams.get(0);
for (int i = 1; i < streams.size(); i++) {
    KStream<String, String> cur = streams.get(i);
    pref = pref.leftJoin(cur,
            new ValueJoinerSimple(),
            JoinWindows.of(Duration.ofSeconds(1)),
            StreamJoined.with(
                    Serdes.String(),
                    Serdes.String(),
                    Serdes.String())
    );
}
      
      





topic.







pref.to(featuresDescriptor.sinkSource);
      
      





.







public void buildStreams(StreamsBuilder builder) {
    Serde<String> stringSerde = Serdes.String();

    List<KStream<String, String>> streams = new ArrayList<>();

    for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
        KStream<String, String> stream =
                builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                        .map(new KeyValueMapperSimple(featureDescriptor.key));
        streams.add(stream);
    }

    if (streams.size() > 0) {
        if (streams.size() == 1) {
            KStream<String, String> stream = streams.get(0);
            stream.to(featuresDescriptor.sinkSource);
        } else {
            KStream<String, String> pref = streams.get(0);
            for (int i = 1; i < streams.size(); i++) {
                KStream<String, String> cur = streams.get(i);
                pref = pref.leftJoin(cur,
                        new ValueJoinerSimple(),
                        JoinWindows.of(Duration.ofSeconds(1)),
                        StreamJoined.with(
                                Serdes.String(),
                                Serdes.String(),
                                Serdes.String())
                );
            }
            pref.to(featuresDescriptor.sinkSource);
        }
    }
}
      
      





.







void run(Properties config)
      
      





( ).







FeaturesStream featuresStream = new FeaturesStream(config);
      
      





kafka.







StreamsBuilder builder = new StreamsBuilder();

featuresStream.buildStreams(builder);
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
      
      





.







streams.start();
      
      





.







public static void run(Properties config) {
    StreamsBuilder builder = new StreamsBuilder();
    FeaturesStream featuresStream = new FeaturesStream(config);
    featuresStream.buildStreams(builder);
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
    CountDownLatch latch = new CountDownLatch(1);
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        streams.close();
        latch.countDown();
    }));
    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
      
      





.







java -jar features-stream-1.0.0.jar -c plain.properties
      
      





: Java 1.8.

: kafka 2.6.0, jsoup 1.13.1.









. .







Premièrement: vous permet de construire rapidement un sujet à union.

Deuxièmement: vous permet de commencer rapidement à fusionner dans différents environnements.







Il est à noter que la solution impose une contrainte sur la structure des données d'entrée. À savoir, le sujet et doit avoir une structure tabulaire. Pour surmonter cette limitation, vous pouvez introduire une couche supplémentaire qui vous permettra de réduire diverses structures à une structure tabulaire.







Pour la mise en œuvre industrielle de fonctionnalités complètes, vous devez prêter attention à une fonctionnalité très puissante et, surtout, flexible: KSQL .







Liens et ressources



Code source ;

Rencontrez Michelangelo: la plate-forme d'apprentissage automatique d'Uber .








All Articles