visit
The experience of running Kafka in test scenarios has reached a high level of convenience thanks to the use of Testcontainers and enhanced support in Spring Boot 3.1 with the @ServiceConnection
annotation. However, writing and maintaining integration tests with Kafka remains a challenge. This article describes an approach that significantly simplifies the testing process by ensuring test isolation and providing a set of tools to achieve this goal. With the successful implementation of isolation, Kafka tests can be organized in such a way that at the stage of result verification, there is full access to all messages that have arisen during the test, thereby avoiding the need for forced waiting methods such as Thread.sleep()
.
As detailed in the article Eradicating Non-Determinism in Tests, clear control over the testing environment is crucial for the reliable execution of tests. This ensures that each test begins from a known state. An example could be a situation where one test creates data in a database and fails to clean it up afterward, negatively impacting the execution of subsequent tests that expect a different database state.
Various methods can be applied to achieve test isolation, including:
Restoring Kafka's initial environment state from scratch for each test scenario can be achieved by restarting Kafka. This option is straightforward in terms of implementation but costly in terms of startup time. There are methods to speed up this process (for more details, you can read about running Kafka on GraalVM), but this article suggests considering an option where we work with a common Kafka instance across all test scenarios.
Below is a diagram illustrating the testing approach:
The key feature of the proposed solution is the strict division of test code into phases corresponding to the Arrange-Act-Assert pattern. You can read more about this approach in the article Ordering Chaos: Arranging HTTP Request Testing in Spring.
To achieve isolation, it is critically important to maintain the following relationships between the key elements of the scheme (numbers correspond to those indicated in the diagram):
Regarding integration with Kafka, it is critical to ensure that all consumers are ready to receive messages. This verification is implemented in the method. The solution is based on the original ContainerTestUtils
from the org.springframework.kafka:spring-kafka-test
library, modified according to the described usage scenario. This method guarantees that at least one partition will be assigned to each Kafka consumer. This implies a limitation: there must be only one partition per topic in the test environment, although this limitation is a result of the current implementation of the method and can be changed.
Using a common Kafka instance requires setting the auto.offset.reset = latest
parameter for consumers. For Spring applications, this is done as follows:
spring.kafka.consumer.auto-offset-reset=latest
The goal is to implement synchronous message sending to Kafka with acknowledgment from the partition leader. To achieve this, the acks = 1
parameter must be set for the producer. In the context of a Spring application, this setting is specified as follows:
spring.kafka.producer.acks=1
When using KafkaTemplate
for sending messages, it is important to ensure the synchronicity of sending, as this component provides only an asynchronous interface org.springframework.kafka.core.KafkaTemplate#send(org.springframework.messaging.Message<?>) return CompletableFuture<SendResult<K, V>>
. The following approach can be used for synchronous sending:
kafkaTemplate.send(message).get()
This ensures that the message sending will be completed synchronously, with waiting for Kafka's acknowledgment before proceeding with the program execution.
Manual offset management with commit means that the message consumer will commit the message processing only after their complete processing. In this context, the offset for topicA
will only be committed after the message has been successfully sent to topicB
and the corresponding acknowledgment has been received.
To implement this logic, it is necessary to disable the automatic offset commit for consumers by setting the enable.auto.commit = false
parameter. In the context of a Spring application, this is configured as follows:
spring.kafka.consumer.enable-auto-commit=false
Furthermore, the commit mechanism should be configured so that the offset is committed after processing each individual message, which is achieved by setting the parameter:
spring.kafka.listener.ack-mode=record
spring.kafka.consumer.auto-offset-reset=latest
.spring.kafka.producer.acks=1
.kafkaTemplate.send(message).get()
.spring.kafka.consumer.enable-auto-commit=false
, spring.kafka.listener.ack-mode=record
.pw.avvero.emk.KafkaSupport#waitForPartitionAssignment
.pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit
.def "User Message Processing with OpenAI"() {
setup:
(1) KafkaSupport.waitForPartitionAssignment(applicationContext)
and:
def openaiRequestCaptor = new RequestCaptor()
(2) restMock.expect(manyTimes(), requestTo("//api.openai.com/v1/chat/completions"))
.andExpect(method(HttpMethod.POST))
.andExpect(openaiRequestCaptor)
.andRespond(withSuccess('{"content": "Hi, how can i help you?"}', MediaType.APPLICATION_JSON))
and:
def telegramRequestCaptor = new RequestCaptor()
(3) restMock.expect(manyTimes(), requestTo("//api.telegram.org/sendMessage"))
.andExpect(method(HttpMethod.POST))
.andExpect(telegramRequestCaptor)
.andRespond(withSuccess('{}', MediaType.APPLICATION_JSON))
when:
(4) mockMvc.perform(post("/telegram/webhook")
.contentType(APPLICATION_JSON_VALUE)
.content("""{
"message": {
"from": {
"id": 10000000
},
"chat": {
"id": 20000000
},
"text": "Hello!"
}
}""".toString())
.accept(APPLICATION_JSON_VALUE))
.andExpect(status().isOk())
(5) KafkaSupport.waitForPartitionOffsetCommit(applicationContext)
then:
(6) openaiRequestCaptor.times == 1
JSONAssert.assertEquals("""{
"content": "Hello!"
}""", openaiRequestCaptor.bodyString, false)
and:
telegramRequestCaptor.times == 1
JSONAssert.assertEquals("""{
"chatId": "20000000",
"text": "Hi, how can i help you?"
}""", telegramRequestCaptor.bodyString, false)
}
Here are the key steps described in the test scenario:
Additional tests, including scenarios with intensive message sending and the use of the @RetryableTopic
mechanism for retries, are also available in the , providing opportunities for study and adaptation to your own development needs.