Dans cet article, je veux expliquer un peu plus en détail comment le mécanisme de validation automatique fonctionne pour les écouteurs dans la bibliothèque kafka-clients (considérez la version 2.6.0)
Dans la documentation, nous pouvons trouver la formulation suivante décrivant le fonctionnement de l'auto-commit:
La validation automatique fonctionne essentiellement comme un cron avec une période définie via la propriété de configuration auto.commit.interval.ms. Si le consommateur plante, après un redémarrage ou un rééquilibrage, la position de toutes les partitions appartenant au consommateur en panne sera réinitialisée au dernier offset validé.
La documentation java pour KafkaConsumer contient à son tour la description suivante:
Le consommateur peut soit commettre automatiquement des compensations périodiquement; ou il peut choisir de contrôler cette position validée manuellement en appelant l'une des API de validation (par exemple commitSync et commitAsync).
De ces formulations, une idée fausse peut surgir selon laquelle un commit d'offset automatique non bloquant se produit en arrière-plan et il n'est pas tout à fait clair comment cela se rapporte au processus de réception de messages par un consommateur spécifique et, surtout, quelles garanties de livraison avons-nous. ?
Examinons de plus près le mécanisme de réception des messages par l'auditeur avec le paramètre enable.auto.commit = true en utilisant l'exemple de l'implémentation de la classe KafkaConsumer de la bibliothèque org.apache.kafka: kafka-clients: 2.6.0
Pour ce faire, considérons l'exemple donné dans la documentation java KafkaConsumer :
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Comment l'auto-commit se produit-il dans ce cas? La réponse doit être trouvée dans la méthode elle-même pour recevoir les nouveaux messages.
consumer.poll(Duration.ofMillis(100));
. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.
maybeAutoCommitOffsetsAsync
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)
private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync
poll KafkaConsumer:
offset
.
KafkaConsumer , .
.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, .
? “at least once delivery”, .