Debezium for CDC : Postgres to Postgres

Debezium for CDC : Postgres to Postgres

Published
July 20, 2022
Author
Chandra Wijaya
Tags
Java

Topology

+--------------+ | | | PostgreSQL | | | +------+-------+ | | | +---------------v------------------+ | | | Kafka Connect | | (Debezium, JDBC connectors) | | | +---------------+------------------+ | | | | +-------v--------+ | | | PostgreSQL | | | +----------------+

Setup Docker services using docker-compose file

I'm using Docker environment to do this, however if you want to try it on host machine, you can use Confluent Platform installed on your OS.
version: "3.7" services: postgres: image: debezium/postgres:13 container_name: pg1 expose: - 5433 ports: - 5433:5433 environment: - POSTGRES_USER=mine - POSTGRES_PASSWORD=qwepoi123 - POSTGRES_DB=sharingcdc - PGPORT=5433 postgres-copy: image: debezium/postgres:13 container_name: pg2 expose: - 5434 ports: - 5434:5434 environment: - POSTGRES_USER=mine - POSTGRES_PASSWORD=qwepoi123 - POSTGRES_DB=sharingcdc-copy - PGPORT=5434 zookeeper: image: confluentinc/cp-zookeeper:5.5.3 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-enterprise-kafka:5.5.3 container_name: kafka links: - zookeeper depends_on: [zookeeper] environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9991 ports: - 9092:9092 debezium: image: dbz-conn-jdbc-sink container_name: dbz links: - kafka - postgres - postgres-copy environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect-status KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 depends_on: [kafka] ports: - 8083:8083 schema-registry: image: confluentinc/cp-schema-registry:5.5.3 container_name: schema environment: - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081 ports: - 8081:8081 depends_on: [zookeeper, kafka] kafkacat: image: edenhill/kafkacat:1.5.0 container_name: kafkacat entrypoint: - /bin/sh - -c - | while [ 1 -eq 1 ];do sleep 60;done
docker-compose.yml

Setup Database

First, we need the first database to have some data. Open pg1 (as we name it) bash and login:
psql -U mine -d sharingcdc -W
-W option will prompt us to input password to connect. Once we are in the psql mode, create table:
CREATE TABLE public.todo ( id uuid NOT NULL, created_date timestamp NULL DEFAULT now(), is_done bool NULL, modified_date timestamp NULL, title varchar(255) NULL, CONSTRAINT todo_pkey PRIMARY KEY (id) );
To enable CDC with Postgres, we need to give replication permission to our database:
ALTER TABLE public.todo REPLICA IDENTITY FULL;
Let's insert some data in it:
INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('79fe5ffa-94ed-4871-a5cd-300471586914'::uuid, '2022-05-31 14:47:12.198', false, '2022-06-20 22:52:10.648', 'do laundry'); INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('129eef91-8f55-4edd-9c63-804c6f1a3f5b'::uuid, '2022-05-31 14:59:58.150', false, '2022-06-20 22:52:11.481', 'feed the dog');

Creating Connectors

Next, we need to establish a connector using Debezium connector to access our first database and stream it to a Kafka topic. Fortunately, it's quite easy to do that. Debezium connector can be accessed through REST API. There are two connectors we need to create, one is for source, the process when stream are inserted into a topic, and the other one is sink, which taking out data from a topic.
source connector
To create source connector, we need to use the PostgresConnector class. Below is an example configs on how to do it. debezium-source-config.json (for publisher / source connector)
{ "name": "sharingcdc-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "postgres", "database.port": "5433", "database.user": "mine", "database.password": "qwepoi123", "database.dbname": "sharingcdc", "database.server.name": "postgres", "database.include.list":"sharingcdc", "tasks.max": 1, "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3" } }
debezium-source-config.json
Finally, we can just use curl to start creating the connector.
curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-source-config.json"
The response we get will be:
HTTP/1.1 201 Created Date: Tue, 12 Jul 2022 09:09:28 GMT Location: http://localhost:8083/connectors/sharingcdc-connector Content-Type: application/json Content-Length: 608 Server: Jetty(9.4.33.v20201020) { "name":"sharingcdc-connector", "config":{ "connector.class":"io.debezium.connector.postgresql.PostgresConnector", "plugin.name":"pgoutput", "database.hostname":"postgres", "database.port":"5433", "database.user":"mine", "database.password":"qwepoi123", "database.dbname":"sharingcdc", "database.server.name":"postgres", "database.include.list":"sharingcdc", "tasks.max":"1", "transforms":"route", "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement":"$3", "name":"sharingcdc-connector" }, "tasks":[], "type":"source" }
Notice the HTTP status 201 Created tells us the connection is successfully created.
Check connector status
curl localhost:8083/connectors/sharingcdc-connector/status
This will give result:
{ "name":"sharingcdc-connector", "connector":{ "state":"RUNNING", "worker_id":"172.21.0.6:8083" }, "tasks":[ { "id":0, "state":"RUNNING", "worker_id":"172.21.0.6:8083" } ], "type":"source" }
Important!
If you want to stop our connector, it is important to delete it before starting new connector with the same name. For example above, we declared that the connector name is sharingcdc-connector, to delete this connector we simply send a DELETE to the API.
curl -X DELETE localhost:8083/connectors/sharingcdc-connector
To confirm that the connector has been deleted, inquiry the connectors that we have using:
curl -X GET localhost:8083/connectors
The /connectors endpoint is the root context for Debezium connectors, and sending a GET request without any parameters returns the list of available connectors.
Checking your topic
According to the official documentation of Debezium connector for postgres : Postgres Topic Names, the topic name is generated automatically using the server name + schema name + table name. So in our case, since we defined the database.server.name in the debezium-config.json file as postgres, and our schema name is public and the table is todo, the constructed topic name will be: postgres.public.todo. That should be it right? Nope! If you see correctly in our debezium-source-config.json, we defined a transformation route which transforms the predefined generated topic into just table name with $3 value.
"transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3"
debezium-source-config.json
So in our case, to name of our topic should be todo instead of postgres.public.todo
To check your topic list, Kafka Connect also provide a CLI to check our topics using the kafka-topics.shfile located in the /bindirectory. Open terminal in our Kafka Connect container, and run:
./bin/kafka-topics.sh --bootstrap-server=kafka:9092 --list
💡
--bootstrap-server param can also be replaced with --zookeeper if you prefer.
Another tool we can use is Kafkacat which I also include in the docker-compose.yml file. To use it, we simply run:
docker exec kafkacat kafkacat -b kafka:9092 -L # List existing topic in kafka:9092
Monitor changes detection by tailing it
Now, we need to check whether the source connector is actually able to stream the data changes. There are several ways to do this:
Via Schema Registry
We can use kafka-avro-console-consumer, by running:
docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
or if you prefer via bash :
  • Open schema-registry bash:
docker exec -it <container-name> sh
  • then run:
./usr/bin/kafka-avro-console-consumer -bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
Via Kafkacat
Or we can also use kafkacat:
docker exec kafkacat kafkacat -b kafka:9092 -t todo -C
Checking if the connector works
Now that we have tailed our connector, we can confirm by trying to modify data, insert new data or even delete data.
Try to insert new data
INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('6fc341f0-04a8-4c91-92bb-3dbb1ef0b557'::uuid, '2022-06-20 22:11:09.613', false, '2022-06-20 22:52:26.648', 'test');
What we will get in the topic monitoring console
{ "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557" } { "before":null, "after":{ "postgres.public.todo.Value":{ "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557", "created_date":{ "long":1655763069613000 }, "is_done":{ "boolean":false }, "modified_date":{ "long":1655765546648000 }, "title":{ "string":"test" } } }, "source":{ "version":"1.4.2.Final", "connector":"postgresql", "name":"postgres", "ts_ms":1657617691850, "snapshot":{ "string":"false" }, "db":"sharingcdc", "schema":"public", "table":"todo", "txId":{ "long":493 }, "lsn":{ "long":23876504 }, "xmin":null }, "op":"c", "ts_ms":{ "long":1657617692088 }, "transaction":null }
Check the schema created by Debezium
curl localhost:8081/subjects/todo-value/versions/1 | jq ".schema | fromjson"
This REST API will show us the schema as below.
{ "type": "record", "name": "Envelope", "namespace": "postgres.public.todo", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "string", "connect.version": 1, "connect.name": "io.debezium.data.Uuid" } }, { "name": "created_date", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "is_done", "type": [ "null", "boolean" ], "default": null }, { "name": "modified_date", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "title", "type": [ "null", "string" ], "default": null } ], "connect.name": "postgres.public.todo.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.postgresql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "schema", "type": "string" }, { "name": "table", "type": "string" }, { "name": "txId", "type": [ "null", "long" ], "default": null }, { "name": "lsn", "type": [ "null", "long" ], "default": null }, { "name": "xmin", "type": [ "null", "long" ], "default": null } ], "connect.name": "io.debezium.connector.postgresql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "postgres.public.todo.Envelope" }
sink connector
After we have all the provider config set up, now it's time to consume the message and replicate all the data into another PostgresDB.
Same with creating source connector, we need set configurations to be sent to the connector endpoint to create sink connector. Here, we're using JdbcSinkConnector class which installed manually while creating Docker image.
debezium-sink-config.json (for consumer / sink connector)
{ "name": "sharingcdc-sink-connector", "config": { "auto.create": "true", "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true", "tasks.max": "1", "topics": "todo", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } }
debezium-sink-config.json
Send the payload through curl:
curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-sink-config.json"
It will gives us this:
HTTP/1.1 201 Created Date: Tue, 12 Jul 2022 06:43:15 GMT Location: http://localhost:8083/connectors/sharingcdc-sink-connector Content-Type: application/json Content-Length: 539 Server: Jetty(9.4.43.v20210629) { "name":"sharingcdc-sink-connector", "config":{ "auto.create":"true", "connection.url":"jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123", "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode":"upsert", "pk.fields":"id", "pk.mode":"record_value", "tasks.max":"1", "topics":"postgres.public.todo", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones":"false", "name":"sharingcdc-sink-connector" }, "tasks":[], "type":"sink" }
Now, everything should be running normally. If so, let's try to connect to the second PostgreDB and check if it works 🤞
docker exec -it pg2 sh #run bash of pg2 psql -U mine -d sharingcdc-copy -W #login to sharingcdc-copy database
After submitting the password, you can run:
\d --check all relations
We should see that table todo automatically created for us, which we never created it before.
sharingcdc-copy=# \d List of relations Schema | Name | Type | Owner --------+------+-------+------- public | todo | table | mine (1 row) sharingcdc-copy=# select * from todo; is_done | id | created_date | modified_date | title ---------+--------------------------------------+------------------+------------------+-------------- f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test (3 rows)
Let's try to insert new data into our first database:
sharingcdc=# INSERT INTO public.todo (id, created_date, is_done, modified_date, title) VALUES('850c5925-bb49-4795-9c19-4b7a6b4514b5'::uuid, '2022-06-20 22:12:02.335', false, NULL, 'Test12'); INSERT 0 1
Then confirm it in the second database:
sharingcdc-copy=# select * from todo; is_done | id | created_date | modified_date | title ---------+--------------------------------------+------------------+------------------+-------------- f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test f | 850c5925-bb49-4795-9c19-4b7a6b4514b5 | 1655763122335000 | | Test12 (4 rows)
Voila! 🎉🎉🎉
You could also see the activity logged in the schema registry:
{ "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5" } { "before":null, "after":{ "postgres.public.todo.Value":{ "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5", "created_date":{ "long":1655763122335000 }, "is_done":{ "boolean":false }, "modified_date":null, "title":{ "string":"Test12" } } }, "source":{ "version":"1.4.2.Final", "connector":"postgresql", "name":"postgres", "ts_ms":1657618002239, "snapshot":{ "string":"false" }, "db":"sharingcdc", "schema":"public", "table":"todo", "txId":{ "long":495 }, "lsn":{ "long":23877952 }, "xmin":null }, "op":"c", "ts_ms":{ "long":1657618002694 }, "transaction":null }
Source code of this project is also available in my Github repo here.

Troubleshooting

Kafka JDBC Connector not available in Debezium Connector

The latest Kafka connect image does not include JBDC Sink Connector we use. This is indicated with this message when we start the connector using the config above.
HTTP/1.1 500 Internal Server Error Date: Mon, 11 Jul 2022 06:44:12 GMT Content-Type: application/json Content-Length: 4581 Server: Jetty(9.4.33.v20201020) { "error_code":500, "message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class <snip> }" }
Since it did not come out of the box with the connectors, we need to install it manually. Solving it needs a bit more trick tho.
Solution
Tweaking around I found out that actually official Debezium developer (Jiri Pechanec) provided how to build our own custom Debezium connector with JDBC Sink Connector included.
In that `Dockerfile` example, the connector used is quay.io/debezium/, however I decided to keep using the first tutorial I followed.
Here is my custom Dockerfile:
FROM debezium/connect:1.4 ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc ARG POSTGRES_VERSION=42.2.8 ARG KAFKA_JDBC_VERSION=5.3.2 # Deploy PostgreSQL JDBC Driver RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar # Deploy Kafka Connect JDBC RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\ curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Let's build it:
docker build -t dbz-conn-jdbc-sink .
Note that dbz-conn-jdbc-sink tag above. It is the image that used in my docker-compose.yml file for debezium service.
If you happen to follow this tutorial, you can build the image yourself before running the docker-compose.yml file.

sink connector failed to run

Primary key mode must be record_key when delete support is enabled
When creating the sink connector, for couple seconds the status endpoint tells us that it is running. However, I tried checking it after 10 seconds wondering why the data was not inserted into the new database.
{ "name":"sharingcdc-sink-connector", "connector":{ "state":"RUNNING", "worker_id":"172.29.0.8:8083" }, "tasks":[{ "id":0, "state":"FAILED", "worker_id":"172.29.0.8:8083", "trace":"org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled <snip>"}], "type":"sink" }
I found out that I'm using "pk.mode":"record_value" instead of record_key. So just simply change the value with record_key and we're good to go.

Can not establish connection to database

{ "name":"sharingcdc-sink-connector", "connector":{ "state":"RUNNING", "worker_id":"172.29.0.8:8083" }, "tasks":[{ "id":0, "state":"FAILED", "worker_id":"172.29.0.8:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: <snip> Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to postgres:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections. <snip>" }], "type":"sink" }
Careful when using localhost in Docker. The term localhost in the host machine and in the Docker are different. In my case, I need to make the connection to the host machine localhost. Defining a static IP would solve the problem, but my IP is changed periodically, thus I need to use host.docker.internal keyword to point to the host localhost.
This keyword also used in my debezium-sink-config.json file:
{ ... "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123" ... }

Closing

Now we have learned how to stream our database changes to Kafka topic and take advantage of it to replicate to another database. More examples are coming for this CDC topic. Hope this helps! 👋

References