Apache Kafka et tests avec Kafka Server


Il existe différentes manières d'écrire des tests à l'aide d'Apache Kafka. Par exemple, TestContainers et EmbeddedKafka peuvent être utilisés . Vous pouvez lire à ce sujet, par exemple, ici: Les pièges du test de Kafka Streams . Mais il existe également une option pour écrire des tests à l'aide de KafkaServer.

Qu'est-ce qui sera testé?

Supposons que vous ayez besoin de développer un service d'envoi de messages via différents canaux: email, télégramme, etc.

Soit le nom du service: SenderService.

Le service doit: écouter le canal spécifié, sélectionner les messages dont il a besoin sur le canal, analyser les messages et les envoyer sur le canal souhaité pour la livraison finale des messages.

Pour tester le service, vous devez composer un message à envoyer à l'aide du canal d'envoi de courrier et vous assurer que le message a été envoyé au canal final.

Bien sûr, dans les applications du monde réel, les tests seront plus difficiles. Mais pour illustrer l'approche choisie, un tel test sera suffisant.

Le service et le test sont implémentés en utilisant: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.

Un service

Le service pourra démarrer et arrêter son travail.

void start()

void stop()

Au début, vous devez définir au moins les paramètres suivants:

String bootstrapServers
String senderTopic
EmailService emailService

bootstrapServers – kafka.

senderTopic – , .

emailService – .


«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.

Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
    SenderConsumerLoop senderConsumerLoop =
            new SenderConsumerLoop(

«», .

«» . .

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    for (AutoCloseable autoCloseable : closeables) {
        try {
        } catch (Exception e) {
    try {
        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {



«» :

void run()

void close()

: run.

public void run() {
    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
    while (true) {

«kafka-». «kafka-» . . .

json- , , .


  "subject": {
    "subject_type": "send"
  "body": {
    "method": "email",
    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
    "title": "42",
    "message": "73"

subject_type — . «send».

method – . «email» — .

recipients – .

title – .

message – .


void calculate(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {


void calculate(ConsumerRecord<String, String> record) {
            JSONParser jsonParser = new JSONParser();
            Object parsedObject = null;
            try {
                parsedObject = jsonParser.parse(record.value());
            } catch (ParseException e) {
            if (parsedObject instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) parsedObject;
                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
                if (SEND.equals(subjectType)) {
                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);


void calculate(JSONObject jsonBody) {
    String method = jsonBody.get(METHOD).toString();
    if (EMAIL_METHOD.equals(method)) {
        String recipients = jsonBody.get(RECIPIENTS).toString();
        String title = jsonBody.get(TITLE).toString();
        String message = jsonBody.get(MESSAGE).toString();
        sendEmail(recipients, title, message);


void sendEmail(String recipients, String title, String message) {
    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));




static KafkaConsumer<String, String> createKafkaConsumerStringString(
        String bootstrapServers,
        String clientId,
        String groupId
) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(properties);


interface EmailService {
    void send(String recipients, String title, String message);





«kafka-». .

public class SenderServiceTest {
    void consumeEmail() throws InterruptedException {
        String brokerHost = "";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {


. «kafka-». «kafka-» . .

«mock» :

SenderService.EmailService emailService = mock(SenderService.EmailService.class);


SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);


String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";


kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));



, :

verify(emailService).send(recipients, title, message);




public class SenderFactory {
    public static final String SUBJECT = "subject";
    public static final String SUBJECT_TYPE = "subject_type";
    public static final String BODY = "body";
    public static final String METHOD = "method";
    public static final String EMAIL_METHOD = "email";
    public static final String RECIPIENTS = "recipients";
    public static final String TITLE = "title";
    public static final String MESSAGE = "message";
    public static final String SEND = "send";

    public static String key() {
        return UUID.randomUUID().toString();

    public static String createMessage(String method, String recipients, String title, String message) {
        Map<String, Object> map = new HashMap<>();
        Map<String, Object> subject = new HashMap<>();
        Map<String, Object> body = new HashMap<>();
        map.put(SUBJECT, subject);
        subject.put(SUBJECT_TYPE, SEND);
        map.put(BODY, body);
        body.put(METHOD, method);
        body.put(RECIPIENTS, recipients);
        body.put(TITLE, title);
        body.put(MESSAGE, message);
        return JSONObject.toJSONString(map);



void start()

void close()

void createTopic(String topic)

«start» .

Créez un "gardien de zoo" et enregistrez son adresse:

zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();

Créez un client de gardien de zoo:

zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);

Définition des propriétés du serveur:

Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
    throw new RuntimeException(e);
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);

Création du serveur:

kafkaServer = TestUtils.createServer(config, new MockTime());


public void start() {
    zkServer = new EmbeddedZookeeper();
    String zkConnect = zkHost + ":" + zkServer.port();
    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    zkUtils = ZkUtils.apply(zkClient, false);
    Properties brokerProps = new Properties();
    brokerProps.setProperty("zookeeper.connect", zkConnect);
    brokerProps.setProperty("broker.id", "0");
    try {
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    } catch (IOException e) {
        throw new RuntimeException(e);
    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
    brokerProps.setProperty("offsets.topic.replication.factor", "1");
    KafkaConfig config = new KafkaConfig(brokerProps);
    kafkaServer = TestUtils.createServer(config, new MockTime());

Arrêt du service:

public void close() {

Création de thème:

public void createTopic(String topic) {
            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);


En conclusion, il faut noter que le code donné ici n'illustre que la méthode choisie.

Pour créer et tester des services à l'aide de "kafka", vous pouvez vous référer à la ressource suivante:


Liens et ressources

La source

Code de test avec "serveur kafka"

