Topology
+--------------+ | | | PostgreSQL | | | +------+-------+ | | | +---------------v------------------+ | | | Kafka Connect | | (Debezium, JDBC connectors) | | | +---------------+------------------+ | | | | +-------v--------+ | | | PostgreSQL | | | +----------------+
Setup Docker services using docker-compose
file
I'm using Docker environment to do this, however if you want to try it on host machine, you can use Confluent Platform installed on your OS.
version: "3.7" services: postgres: image: debezium/postgres:13 container_name: pg1 expose: - 5433 ports: - 5433:5433 environment: - POSTGRES_USER=mine - POSTGRES_PASSWORD=qwepoi123 - POSTGRES_DB=sharingcdc - PGPORT=5433 postgres-copy: image: debezium/postgres:13 container_name: pg2 expose: - 5434 ports: - 5434:5434 environment: - POSTGRES_USER=mine - POSTGRES_PASSWORD=qwepoi123 - POSTGRES_DB=sharingcdc-copy - PGPORT=5434 zookeeper: image: confluentinc/cp-zookeeper:5.5.3 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-enterprise-kafka:5.5.3 container_name: kafka links: - zookeeper depends_on: [zookeeper] environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9991 ports: - 9092:9092 debezium: image: dbz-conn-jdbc-sink container_name: dbz links: - kafka - postgres - postgres-copy environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect-status KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 depends_on: [kafka] ports: - 8083:8083 schema-registry: image: confluentinc/cp-schema-registry:5.5.3 container_name: schema environment: - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081 ports: - 8081:8081 depends_on: [zookeeper, kafka] kafkacat: image: edenhill/kafkacat:1.5.0 container_name: kafkacat entrypoint: - /bin/sh - -c - | while [ 1 -eq 1 ];do sleep 60;done
Setup Database
First, we need the first database to have some data. Open
pg1
(as we name it) bash and login:psql -U mine -d sharingcdc -W
-W
option will prompt us to input password to connect. Once we are in the psql
mode, create table:CREATE TABLE public.todo ( id uuid NOT NULL, created_date timestamp NULL DEFAULT now(), is_done bool NULL, modified_date timestamp NULL, title varchar(255) NULL, CONSTRAINT todo_pkey PRIMARY KEY (id) );
To enable CDC with Postgres, we need to give replication permission to our database:
ALTER TABLE public.todo REPLICA IDENTITY FULL;
Let's insert some data in it:
INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('79fe5ffa-94ed-4871-a5cd-300471586914'::uuid, '2022-05-31 14:47:12.198', false, '2022-06-20 22:52:10.648', 'do laundry'); INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('129eef91-8f55-4edd-9c63-804c6f1a3f5b'::uuid, '2022-05-31 14:59:58.150', false, '2022-06-20 22:52:11.481', 'feed the dog');
Creating Connectors
Next, we need to establish a connector using Debezium connector to access our first database and stream it to a Kafka topic. Fortunately, it's quite easy to do that. Debezium connector can be accessed through REST API.
There are two connectors we need to create, one is for
source
, the process when stream are inserted into a topic, and the other one is sink
, which taking out data from a topic.source
connectorTo create
source
connector, we need to use the PostgresConnector
class. Below is an example configs on how to do it.
debezium-source-config.json
(for publisher / source
connector){ "name": "sharingcdc-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "postgres", "database.port": "5433", "database.user": "mine", "database.password": "qwepoi123", "database.dbname": "sharingcdc", "database.server.name": "postgres", "database.include.list":"sharingcdc", "tasks.max": 1, "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3" } }
Finally, we can just use
curl
to start creating the connector.curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-source-config.json"
The response we get will be:
HTTP/1.1 201 Created Date: Tue, 12 Jul 2022 09:09:28 GMT Location: http://localhost:8083/connectors/sharingcdc-connector Content-Type: application/json Content-Length: 608 Server: Jetty(9.4.33.v20201020) { "name":"sharingcdc-connector", "config":{ "connector.class":"io.debezium.connector.postgresql.PostgresConnector", "plugin.name":"pgoutput", "database.hostname":"postgres", "database.port":"5433", "database.user":"mine", "database.password":"qwepoi123", "database.dbname":"sharingcdc", "database.server.name":"postgres", "database.include.list":"sharingcdc", "tasks.max":"1", "transforms":"route", "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement":"$3", "name":"sharingcdc-connector" }, "tasks":[], "type":"source" }
Notice the HTTP status
201 Created
tells us the connection is successfully created.Check connector status
curl localhost:8083/connectors/sharingcdc-connector/status
This will give result:
{ "name":"sharingcdc-connector", "connector":{ "state":"RUNNING", "worker_id":"172.21.0.6:8083" }, "tasks":[ { "id":0, "state":"RUNNING", "worker_id":"172.21.0.6:8083" } ], "type":"source" }
Important!
If you want to stop our connector, it is important to delete it before starting new connector with the same name.
For example above, we declared that the connector name is
sharingcdc-connector
, to delete this connector we simply send a DELETE
to the API.curl -X DELETE localhost:8083/connectors/sharingcdc-connector
To confirm that the connector has been deleted, inquiry the connectors that we have using:
curl -X GET localhost:8083/connectors
The
/connectors
endpoint is the root context for Debezium connectors, and sending a GET
request without any parameters returns the list of available connectors.Checking your topic
According to the official documentation of Debezium connector for postgres : Postgres Topic Names, the topic name is generated automatically using the
server name
+ schema name
+ table name
.
So in our case, since we defined the database.server.name
in the debezium-config.json
file as postgres
, and our schema name is public
and the table is todo
, the constructed topic name will be: postgres.public.todo
.
That should be it right? Nope!
If you see correctly in our debezium-source-config.json
, we defined a transformation route which transforms the predefined generated topic into just table name with $3
value."transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3"
So in our case, to name of our topic should be
todo
instead of postgres.public.todo
To check your topic list, Kafka Connect also provide a CLI to check our topics using the
kafka-topics.sh
file located in the /bin
directory.
Open terminal in our Kafka Connect container, and run:./bin/kafka-topics.sh --bootstrap-server=kafka:9092 --list
--bootstrap-server
param can also be replaced with --zookeeper
if you prefer.Another tool we can use is
Kafkacat
which I also include in the docker-compose.yml
file. To use it, we simply run:docker exec kafkacat kafkacat -b kafka:9092 -L # List existing topic in kafka:9092
Monitor changes detection by tailing it
Now, we need to check whether the
source
connector is actually able to stream the data changes. There are several ways to do this:Via Schema Registry
We can use
kafka-avro-console-consumer
, by running:docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
or if you prefer via
bash
:- Open
schema-registry
bash:
docker exec -it <container-name> sh
- then run:
./usr/bin/kafka-avro-console-consumer -bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
Via Kafkacat
Or we can also use
kafkacat
:docker exec kafkacat kafkacat -b kafka:9092 -t todo -C
Checking if the connector works
Now that we have tailed our connector, we can confirm by trying to modify data, insert new data or even delete data.
Try to insert new data
INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('6fc341f0-04a8-4c91-92bb-3dbb1ef0b557'::uuid, '2022-06-20 22:11:09.613', false, '2022-06-20 22:52:26.648', 'test');
What we will get in the topic monitoring console
{ "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557" } { "before":null, "after":{ "postgres.public.todo.Value":{ "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557", "created_date":{ "long":1655763069613000 }, "is_done":{ "boolean":false }, "modified_date":{ "long":1655765546648000 }, "title":{ "string":"test" } } }, "source":{ "version":"1.4.2.Final", "connector":"postgresql", "name":"postgres", "ts_ms":1657617691850, "snapshot":{ "string":"false" }, "db":"sharingcdc", "schema":"public", "table":"todo", "txId":{ "long":493 }, "lsn":{ "long":23876504 }, "xmin":null }, "op":"c", "ts_ms":{ "long":1657617692088 }, "transaction":null }
Check the schema created by Debezium
curl localhost:8081/subjects/todo-value/versions/1 | jq ".schema | fromjson"
This REST API will show us the schema as below.
{ "type": "record", "name": "Envelope", "namespace": "postgres.public.todo", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "string", "connect.version": 1, "connect.name": "io.debezium.data.Uuid" } }, { "name": "created_date", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "is_done", "type": [ "null", "boolean" ], "default": null }, { "name": "modified_date", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "title", "type": [ "null", "string" ], "default": null } ], "connect.name": "postgres.public.todo.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.postgresql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "schema", "type": "string" }, { "name": "table", "type": "string" }, { "name": "txId", "type": [ "null", "long" ], "default": null }, { "name": "lsn", "type": [ "null", "long" ], "default": null }, { "name": "xmin", "type": [ "null", "long" ], "default": null } ], "connect.name": "io.debezium.connector.postgresql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "postgres.public.todo.Envelope" }
sink
connectorAfter we have all the provider config set up, now it's time to consume the message and replicate all the data into another PostgresDB.
Same with creating
source
connector, we need set configurations to be sent to the connector endpoint to create sink
connector. Here, we're using JdbcSinkConnector
class which installed manually while creating Docker image.debezium-sink-config.json
(for consumer / sink
connector){ "name": "sharingcdc-sink-connector", "config": { "auto.create": "true", "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true", "tasks.max": "1", "topics": "todo", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } }
Send the payload through
curl
:curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-sink-config.json"
It will gives us this:
HTTP/1.1 201 Created Date: Tue, 12 Jul 2022 06:43:15 GMT Location: http://localhost:8083/connectors/sharingcdc-sink-connector Content-Type: application/json Content-Length: 539 Server: Jetty(9.4.43.v20210629) { "name":"sharingcdc-sink-connector", "config":{ "auto.create":"true", "connection.url":"jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123", "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode":"upsert", "pk.fields":"id", "pk.mode":"record_value", "tasks.max":"1", "topics":"postgres.public.todo", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones":"false", "name":"sharingcdc-sink-connector" }, "tasks":[], "type":"sink" }
Now, everything should be running normally. If so, let's try to connect to the second PostgreDB and check if it works 🤞
docker exec -it pg2 sh #run bash of pg2 psql -U mine -d sharingcdc-copy -W #login to sharingcdc-copy database
After submitting the password, you can run:
\d --check all relations
We should see that table
todo
automatically created for us, which we never created it before.sharingcdc-copy=# \d List of relations Schema | Name | Type | Owner --------+------+-------+------- public | todo | table | mine (1 row) sharingcdc-copy=# select * from todo; is_done | id | created_date | modified_date | title ---------+--------------------------------------+------------------+------------------+-------------- f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test (3 rows)
Let's try to insert new data into our first database:
sharingcdc=# INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('850c5925-bb49-4795-9c19-4b7a6b4514b5'::uuid, '2022-06-20 22:12:02.335', false, NULL, 'Test12'); INSERT 0 1
Then confirm it in the second database:
sharingcdc-copy=# select * from todo; is_done | id | created_date | modified_date | title ---------+--------------------------------------+------------------+------------------+-------------- f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test f | 850c5925-bb49-4795-9c19-4b7a6b4514b5 | 1655763122335000 | | Test12 (4 rows)
Voila! 🎉🎉🎉
You could also see the activity logged in the schema registry:
{ "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5" } { "before":null, "after":{ "postgres.public.todo.Value":{ "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5", "created_date":{ "long":1655763122335000 }, "is_done":{ "boolean":false }, "modified_date":null, "title":{ "string":"Test12" } } }, "source":{ "version":"1.4.2.Final", "connector":"postgresql", "name":"postgres", "ts_ms":1657618002239, "snapshot":{ "string":"false" }, "db":"sharingcdc", "schema":"public", "table":"todo", "txId":{ "long":495 }, "lsn":{ "long":23877952 }, "xmin":null }, "op":"c", "ts_ms":{ "long":1657618002694 }, "transaction":null }
Source code of this project is also available in my Github repo here.
Troubleshooting
Kafka JDBC Connector not available in Debezium Connector
The latest Kafka connect image does not include JBDC Sink Connector we use. This is indicated with this message when we start the connector using the config above.
HTTP/1.1 500 Internal Server Error Date: Mon, 11 Jul 2022 06:44:12 GMT Content-Type: application/json Content-Length: 4581 Server: Jetty(9.4.33.v20201020) { "error_code":500, "message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class <snip> }" }
Since it did not come out of the box with the connectors, we need to install it manually. Solving it needs a bit more trick tho.
Solution
Tweaking around I found out that actually official Debezium developer (Jiri Pechanec) provided how to build our own custom Debezium connector with JDBC Sink Connector included.
In that `Dockerfile` example, the connector used is
quay.io/debezium/
, however I decided to keep using the first tutorial I followed.Here is my custom
Dockerfile
:FROM debezium/connect:1.4 ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc ARG POSTGRES_VERSION=42.2.8 ARG KAFKA_JDBC_VERSION=5.3.2 # Deploy PostgreSQL JDBC Driver RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar # Deploy Kafka Connect JDBC RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\ curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Let's build it:
docker build -t dbz-conn-jdbc-sink .
Note that
dbz-conn-jdbc-sink
tag above. It is the image that used in my docker-compose.yml
file for debezium
service.If you happen to follow this tutorial, you can build the image yourself before running the
docker-compose.yml
file.sink
connector failed to run
Primary key mode must be
record_key
when delete support is enabledWhen creating the
sink
connector, for couple seconds the status endpoint tells us that it is running. However, I tried checking it after 10 seconds wondering why the data was not inserted into the new database.{ "name":"sharingcdc-sink-connector", "connector":{ "state":"RUNNING", "worker_id":"172.29.0.8:8083" }, "tasks":[{ "id":0, "state":"FAILED", "worker_id":"172.29.0.8:8083", "trace":"org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled <snip>"}], "type":"sink" }
I found out that I'm using
"pk.mode":"record_value"
instead of record_key
. So just simply change the value with record_key
and we're good to go.Can not establish connection to database
{ "name":"sharingcdc-sink-connector", "connector":{ "state":"RUNNING", "worker_id":"172.29.0.8:8083" }, "tasks":[{ "id":0, "state":"FAILED", "worker_id":"172.29.0.8:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: <snip> Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to postgres:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections. <snip>" }], "type":"sink" }
Careful when using
localhost
in Docker. The term localhost
in the host machine and in the Docker are different. In my case, I need to make the connection to the host machine localhost
. Defining a static IP would solve the problem, but my IP is changed periodically, thus I need to use host.docker.internal
keyword to point to the host localhost
.This keyword also used in my
debezium-sink-config.json
file:{ ... "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123" ... }
Closing
Now we have learned how to stream our database changes to Kafka topic and take advantage of it to replicate to another database. More examples are coming for this CDC topic.
Hope this helps! 👋