visit
Back-end engineering nowadays may require the integration of multiple services. It is painful for engineers to install many services in their local development environment. Docker has provided an easier way to do this, but it will still require some scripting outside our code. It also has not exactly had a perfect solution if we wanted to test smaller functions or classes instead of the whole service. This problem has been addressed by Testcontainers [].
This article will give you an understanding of what is Testcontainers, why should you use Testcontainers, and finally how to use it. While this article will only be based on experience on Java programming language, Testcontainers does has support for other programming languages, like go [] and node-js [].It Takes Few Lines
Let say you wanted to use Kafka, you can easily define your Kafka instance in your test file aspublic class JavaKafkaTest {
@Rule
public KafkaContainer kafka;
@Before
public void setUp() throws Exception {
kafka = new KafkaContainer("5.4.2");
kafka.start();
}
}
This code will download Confluent platform’s docker image file with version
5.4.2
(Kafka version 2.5.0
) and start the docker, all in few lines of codes.Kafka is coupled with zookeeper, however, by default it has been handled by Testcontainers. If you wish to use your own zookeeper for some reason, then luckily there’s an option for it.Full Example: Producing Messages and Consuming with Kafka Streams
I have shown you a little on how to make a
KafkaContainer
instance, now I will demonstrate full class.public class JavaKafkaTest {
@Rule
public KafkaContainer kafka;
@Before
public void setUp() throws Exception {
kafka = new KafkaContainer("5.4.2");
kafka.start();
Properties adminProperties = new Properties();
adminProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
AdminClient adminClient = KafkaAdminClient.create(adminProperties);
adminClient.createTopics(
Stream.of("plaintext-input")
.map(n -> new NewTopic(n, 1, (short) 1))
.collect(Collectors.toList())
).all().get();
}
@Test
public void testKafkaTestcontainer() throws InterruptedException {
prepareSeedMessages();
final Topology topology = prepareKafkaTopology();
final KafkaStreams streams = new KafkaStreams(topology, consumerProps());
streams.start();
Thread.sleep(5000);
}
private void prepareSeedMessages() {
KafkaProducer producer = new KafkaProducer(producerProps());
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.close();
}
private Topology prepareKafkaTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder
.<String, String>stream("plaintext-input")
.peek((k, v) -> {
Logger log = Logger.getLogger(Thread.currentThread().getName());
log.info(String.format("receive message from plaintext-input : %s", v));
})
.flatMapValues(v -> Arrays.asList(v.split("\\W+")))
.peek((k, v) -> {
Logger log = Logger.getLogger(Thread.currentThread().getName());
log.info(String.format("receive message from count : %s", v));
})
.groupBy((k, v) -> v)
.count();
return streamsBuilder.build();
}
private Properties consumerProps() {
Properties consumerProps = new Properties();
consumerProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
consumerProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
consumerProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "sample-app");
consumerProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
return consumerProps;
}
private Properties producerProps() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return producerProperties;
}
@After
public void tearDown() {
kafka.stop();
}
}
On
setUp
method, all the necessary components are prepared. KafkaContainer
must be started, before other calls. Remember @Before
will be executed for every test method, so that will make our tests immutable. But on the other hand, this will made the container go up and down between the test method. Function
testKafkaContainer
is the runnable part. This is where you do stuff and assert all the things you need. This example demonstrates complete publish and subscribe in a single method.prepareSeedMessages
procedure will populate the Kafka queue with three strings. I set the standard, required attributes: key serializer, value serializer, application id, and bootstrap server. It is important that when setting the producer’s properties, I took a bootstrap server from Kafka container instance, instead of hard-coding it by yourself.We then prepare for creating a Kafka stream’s topology. Topology is basically a directed acyclic graph (DAG) that defines the sequence of processing [4]. I will use a "necessary" word count topology example.
Here, I have defined to take the input from the topic “plaintext-input”. I used peek to execute a logging procedure and return what that procedure received. Then,
flatMapValues
will break each token by whitespaces. I then can group by each word and do count.Finally, I used
tearDown
method to stop the Kafka container. This method will destroy the container and delete all the data inside this container.If we have multiple tests in one class, it will be better if we move start and stop method to
@BeforeClass
and @AfterClass
. Using @Before
and @After
like this example will results in container created and destroyed along every single test method execution. It will make running this test class very long. Instead we can declare construct once for each test class.