visit
Kafka is a streaming platform that is used to process data in real-time. In a world where data is king, Kafka is a valuable tool for developers and data engineers to learn. However, setting up Kafka locally can be frustrating, which can discourage learning. In this article, I will show you the fastest way to set up Kafka for development using Docker, and also show you how this setup can support connecting locally and from other local Docker containers.
This tutorial assumes that you have some knowledge of using Docker and docker-compose
for development. If you are new to Docker, I recommend reading first. If you need to install Docker, follow the instructions .
Kafka categorizes data into topics. A topic is a category or feed name to which records are published.
Create a file called docker-compose.yml
and add the following content:
# docker-compose.yml
version: "3.7"
services:
zookeeper:
restart: always
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper-volume:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
restart: always
image: docker.io/bitnami/kafka:3.3
ports:
- "9093:9093"
volumes:
- "kafka-volume:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
kafka-volume:
zookeeper-volume:
Notice that we set a few environment variables for the Kafka container. For more information, see the image documentation. See that we have also defined a zookeeper
service. This is because Kafka depends on Zookeeper to store metadata about the topics and partitions. For development purposes, you don't need to interact with it and
you can safely ignore it for the time being.
To run this, simply run docker-compose up -d
and you should see the following output:
$ docker-compose up -d
Creating network "kafka-on-docker_default" with the default driver
Creating volume "kafka-on-docker_kafka-volume" with default driver
Creating volume "kafka-on-docker_zookeeper-volume" with default driver
Creating kafka-on-docker_zookeeper_1 ... done
Creating kafka-on-docker_kafka_1 ... done
$ kcat -b localhost:9093 -L # list all topics currently in kafka
Metadata for all topics (from broker 1: localhost:9093/1):
1 brokers:
broker 1 at localhost:9093 (controller)
0 topics:
Note that we are using localhost:9093
instead of the default 9092 port. This is because we are using the port that is exposed to our local machine.
$ kcat -b localhost:9093 -t test-topic -P # producer
one line per message
another line
The default delimiter between messages is a newline. When you are done, press ctrl-d
to send the messages.
(FYI - clicking ctrl+c
will not work, you will have to try again.)
$ kcat -b localhost:9093 -t test-topic -C # consumer
one line per message
another line
% Reached end of topic test-topic [0] at offset 2
Publishing arbitrary text is not quite what we want though, so let's try publishing JSON messages instead. To make this easier
for extension, we will write some Python scripts to produce and consume messages.
To start, you must install the kafka-python
library. You can do this by running pip install kafka-python
, or if you are
more advanced, use a virtualenv
and install from requirements.txt
instead.
# producer.py
from kafka import KafkaProducer
from datetime import datetime
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9093'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('posts', {'author': 'choyiny', 'content': 'Kafka is cool!', 'created_at': datetime.now().isoformat()})
Below is an example of a Python consumer that subscribes to post
and prints out every value.
# consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'posts',
bootstrap_servers=['localhost:9093'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# note that this for loop will block forever to wait for the next message
for message in consumer:
print(message.value)
Create a file called docker-compose.yml
and add the following content:
# docker-compose.yml
version: "3.7"
services:
clickhouse:
restart: always
image: clickhouse/clickhouse-server
ports:
- "8123:8123"
- "9000:9000"
volumes:
- "clickhouse-volume:/var/lib/clickhouse/"
volumes:
clickhouse-volume:
Now we are ready to connect to Clickhouse. Run the following command to start the Clickhouse container:
$ docker-compose -f clickhouse.docker-compose.yml exec clickhouse clickhouse-client
ClickHouse client version 22.11.2.30 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 22.11.2 revision 54460.
Warnings:
* Linux is not using a fast clock source. Performance can be degraded. Check /sys/devices/system/clocksource/clocksource0/current_clocksource
0cd6f3269407 :)
This is an interactive shell that allows you to run SQL commands. To create a table, run the following command:
-- create messages queue
CREATE TABLE default.message_queue
(
created_at DateTime,
content String,
author String
)
ENGINE = Kafka(
'kafka:9092',
'posts',
'clickhouse',
'JSONEachRow'
) settings kafka_thread_per_consumer = 1, kafka_num_consumers = 1;
Note how we are using kafka:9092
as the connection string instead of localhost:9093
when connecting to Kafka locally. This
is because we are connecting to it through the internal Docker network.
-- create messages table
CREATE TABLE default.messages
(
created_at DateTime,
content String,
author String
)
ENGINE = MergeTree
ORDER BY created_at;
-- create materialized view
CREATE MATERIALIZED VIEW default.messages_mv
TO default.messages
AS SELECT * FROM default.message_queue;
I'll leave it as an exercise to the reader to come up with the SQL queries to select the data in the messages
table.
For our demo app, we will create an API endpoint POST /posts
in Python Flask, and instead of saving it to a database directly, we are going to produce it to the Kafka topic posts
. Follow along below or by cloning . On top of that, since we have setup Clickhouse to consume messages from Kafka, we will be able to see the messages in the Clickhouse UI.
Here's the straight forward code for the API endpoint:
# app.py
# fun fact: This snippet was generated entirely by Copilot
from flask import Flask, request
from kafka import KafkaProducer
from datetime import datetime
import json
app = Flask(__name__)
producer = KafkaProducer(
bootstrap_servers=['kafka:9093'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
@app.route('/posts', methods=['POST'])
def create_post():
post = request.get_json()
# clickhouse can only parse strings without milliseconds
post['created_at'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
producer.send('posts', post)
return 'ok'
To run this, you can use the following command:
$ flask run
After running, you can try out the endpoint by executing a curl command:
$ curl -X POST -H "Content-Type: application/json" -d '{"author": "choyiny", "content": "Kafka is cool!"}' //localhost:5000/posts
It would be cool if it worked first try, however, these are commands you can use to pull out the logs from Clickhouse to inspect errors:
$ docker-compose exec clickhouse tail -f /var/log/clickhouse-server/clickhouse-server.log
To remove corrupted messages from a kafka topic, it is sufficient to just delete the topic entirely with this command:
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic posts
Ideally, there are no errors. I'll leave it as an exercise to the reader to connect to Clickhouse again and inspect that data from Kafka is being consumed.