Building an enterprise data warehouse can be either relatively straightforward or very sophisticated. It depends on many factors, such as the conceptual data model complexity and the variety of source systems. In many cases, applying the approach can make the data integration simpler. Fortunately, there are plenty of CDC tools available in the market, many of which are easy-to-use and affordable, while others are cumbersome and expensive (for what it is).
What I am interested in doing is to move the data from a SQL Server to without too much hassle like changing the firewall rules. Many enterprise solutions can achieve this out of the box, such as and . The selection process should take into account at least the following.
- Environment - A self-hosted or a fully managed solution. If it is the latter, a more thorough consideration of regulatory and compliance issues is required.
- Connectors - The availability of the connectors to source and target systems.
- Quality Attributes - Performance, robustness, and reliability of the solution.
- Pricing - A free, a connector-based, or a consumption-based pricing model.
I have recently looked at an open-source tool, and found it fascinating, especially the , which is an incubating feature.
Testing Debezium
To test this out, I use the Stack Overflow data (~10GB) provided by with a simple setup as follows.
Configure a SQL Server
- Restore the database using MDF, NDF, and LDF files. At present, Cloud SQL does not support CDC so to test this, either use or a (the container must run with an environment variable 'MSSQL_AGENT_ENABLED=True' to enable the SQL Server Agent).
- Enable CDC on the database and all tables.
- Start the CDC job on the SQL Server.
Set Up a Debezium Server
- Download and extract the Debezium Server from the .
- Start a ZooKeeper server.
- Start a Kafka server.
- Create a file `conf/application.properties` as follows:
debezium.sink.type=pubsub
debezium.sink.pubsub.projectid=<PROJECT_ID>
debezium.sink.pubsub.ordering.enabled=true
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=<IP_ADDRESS_OR_HOSTNAME>
debezium.source.database.port=1433
debezium.source.database.user=<USERNAME>
debezium.source.database.password=<PASSWORD>
debezium.source.database.dbname=<DB_NAME>
debezium.source.database.server.name=<SERVER_NAME>
debezium.source.schema.whitelist=<SCHEMA>
debezium.source.database.history.kafka.bootstrap.servers=<KAFKA_SERVER>
debezium.source.database.history.kafka.topic=<KAFKA_TOPIC>
Create and Start the Processing Pipeline
Create a BigQuery dataset and tables (must include column `op` and `ts_ms`).Create a Pub/Sub topic for each table and one for the database.Create an to listen to Pub/Sub push subscriptions. This test streams only the latest data, the operation, and the timestamp into BigQuery.#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
This HTTP function is responsible for:
- Parsing the messages published by the Debezium Server.
- Inserting the records to relevant BigQuery tables.
This HTTP function is not responsible for:
- Handling or logging errors.
- Deduplicating data.
- Validating the input.
- Checking for dupliations.
'''
import base64
import json
from google.api_core import retry
from google.cloud import bigquery
BQ = bigquery.Client()
def stream_dbz_message(request):
'''This function is executed whenever the endpoint is called'''
request_json = request.get_json(silent=True)
data = request_json["message"]["data"]
data = base64.b64decode(data).decode('utf-8')
data = json.loads(data)
payload = data["payload"]
record = payload["after"]
record["op"] = payload["op"]
record["ts_ms"] = payload["ts_ms"]
dataset_id = payload["source"]["db"]
table_id = payload["source"]["table"]
table = BQ.dataset(dataset_id).table(table_id)
_ = BQ.insert_rows_json(table=table, json_rows=[record])
return
- Create a subscription for each topic with the HTTP function above as a push endpoint.
- Run ./run.sh to start the CDC process.
The initial load of all the tables took quite a while to complete as it was configured to run sequentially. The HTTP function above was written for a quick test and should not be used in production. To make it production-ready, it requires a better error handling, a recovery mechanism, and a deduplication method to guarantee high-quality delivery.With the support for SQL Server coming soon, it might be more convenient for those who are already on GCP to use the to stream data directly into BigQuery in a more scalable and controllable way. For example, the embedded connector can be run in a single-topic mode, publishing all updates for a database to a single Pub/Sub topic.I would also like to try out a framework like , which is based on specification and already have a few log-based connectors. It seems that the industry is shying away from a locked-in ecosystem and aggressive price points at a rapid pace. Debezium and many other open-source tools are still cumbersome, but it is evolving both in terms of quality and usability. When it is not feasible to use fully managed services, these can become great options.Previously published at