visit
Microservices, Machine Learning & Big Data are making waves among organizations. Curiously they all share the same biggest concern: data.
In today’s fast-moving world, where immediacy of information is no longer nice-to-have, but absolutely critical for business competitiveness and survival, ETL processes that run on periodic basis are no longer the best option. Real-time (or near real-time) alternatives such as streaming provide a way of processing big volumes of data while allowing to instantly react to changing conditions, but sometimes at an expense of having tons of people dedicated to build an infrastructure capable of doing so while addressing issues like fault tolerance, performance, scalability and reliability. Therefore we’ll go through a way of doing it without even having to write a single line of code.Change Data Capture (CDC) w/ Kafka & Kafka Connect
Entity Relationship Diagram (ERD)
Now, let’s imagine that, for some reason, this store needs to share part of its data to a third-party client which is operating on a bigger scale. Let’s also assume that this client doesn’t rely only on relational databases. Data is in fact being replicated in multiple instances and is supported by different technologies with distinct purposes (analytics, searching, machine learning, etc). This setup creates multiple challenges:services:
kafka:
container_name: bike-store-kafka
image: landoop/fast-data-dev:2.5
environment:
ADV_HOST: 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS: 0 # Disable Running tests so the cluster starts faster
CONNECTORS: debezium-postgres # Allows only described connectors
SAMPLEDATA: 0 # Disable sample data topic creation
RUNNING_SAMPLEDATA: 0 # Disable sample data
ports:
- 2181:2181 # Zookeeper
- 3030:3030 # Landoop UI
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, & Kafka Connect
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker
networks:
- bike-store
postgres:
container_name: bike-store-postgres
image: debezium/postgres:12-alpine
environment:
POSTGRES_DB: store # PostgreSQL database
POSTGRES_USER: postgres # PostgreSQL user
POSTGRES_PASSWORD: postgres # PostgreSQL password
ports:
- 5432:5432
volumes:
- /{YOUR_FOLDER_LOCATION}/1-schema.sql:/docker-entrypoint-initdb.d/1-schema.sql
- /{YOUR_FOLDER_LOCATION}/2-data.sql:/docker-entrypoint-initdb.d/2-data.sql
networks:
- bike-store
networks:
bike-store:
name: bike-store-network
Docker Compose ()
You may have noticed that we’re starting our database container with a volume. This volume is used to mount two files on this container which then are going to be used as initialization scripts. This is important, because not only it allows us to provide an initial state by creating and seeding our tables, but also it allows Debezium’s plugin to perform an initial snapshot of our database. Just don’t forget to both files and change volume’s mount source to your local folder.Next, just change your directory to the folder where you’ve downloaded your docker-compose.yml file and set everything up by running:$ docker-compose up -d
List running containers:
$ docker ps -f network=bike-store-network
Read logs from database container:
$ docker logs bike-store-postgres
Initial UI ()
By default, four system topics are created automatically. They are used to save not only schema and connector configurations, but also connector offsets and status, so no need to worry about managing this settings.The next step is to create a Kafka Connect source connector. In order to do so just use . After accessing it, press “New” and choose “PostgresConnector” from the available options. This connector is in fact Debezium’s PostgreSQL Connector, so just go ahead and copy and paste the following properties:# //debezium.io/docs/connectors/postgresql/
# Unique name for the source connector.
name=bike-store-source-connector
# The name of the Java class for the connector.
connector.class=io.debezium.connector.postgresql.PostgresConnector
# The maximum number of tasks that should be created for this connector.
tasks.max=1
# The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance.
slot.name=debezium
# Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored.
database.server.name=limadelrey
# IP address or hostname of the PostgreSQL database server.
database.hostname=postgres
# Integer port number of the PostgreSQL database server.
database.port=5432
# Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
database.user=postgres
# Password to use when connecting to the PostgreSQL database server.
database.password=postgres
# The name of the PostgreSQL database from which to stream the changes.
database.dbname=store
# An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored.
table.whitelist=store.customers,store.products,store.orders,store.order_items
Source connector ()
The source connector properties are pretty self-explanatory. Given a PostgreSQL instance and its credentials, the connector is able to consume every message generated from the whitelisted tables just by listening to the database transaction log. Since the database is already populated, the connector performs an initial snapshot of the database, creating a topic for each table automatically with every single message ever recorded:Kafka topics ()
These topics are divided in partitions and each partition is placed on a separate broker. They keep data in the cluster until a configurable period has passed by and they are replicated for backup and high availability purposes. This is especially important because it allows to keep subscribing more consumers while maintaining data over time. Regarding data, every message produced by Debezium’s connector has a key and a value. The message’s key holds the value of the table’s primary key (which allows to track all changes made to a row as a sequence of events) while the message’s value holds the remaining fields in its previous and current state, plus additional metadata about the message generation following a format that is conveniently defined by Debezium with the following fields:Message on customers topic ()
Alongside with topic creation, the source connector also creates an AVRO schema for each message key and each message value which allows to enforce the format presented previously. AVRO schemas are very popular in data serialisation because they allow to adopt a data format and enforce rules that enable schema evolution while guaranteeing not to break downstream applications:Bike store schemas ()
Finally, should look as follows:Final UI ()
That’s it! With this setup you’re able to . It’s prepared to scale up horizontally or even to move to a different machine later on since it’s taking advantage of Kafka Connect’s distributed mode. Moreover, with Kafka acting as backbone, you can use it as a central integration point for numerous data sinks like Neo4j, MongoDB, ElasticSearch and so on:Kafka as central integration point
As you could see, it can be easy to setup your data pipeline by choosing the right tools. You don’t need to have a deep knowledge about Kafka and Zookeeper configurations to start using Kafka Connect nor you need to spend hours and hours trying to understand how to put all of these tools working together. By following this approach you’ll be on the right path to provide a great infrastructure while focusing on your biggest concern: data.
You can find all the necessary configurations on the following .Previously published at