A BLOG POST

Debezium for CDC : Postgres to Postgres

Ever wondered how to duplicate your database in real time without pain? There is Change Data Capture which we will combine with Debezium to stream the data changes into Kafka topic and make use of it. This post uses Postgres to Postgres databases for the example.

Debezium for CDC : Postgres to Postgres image
chandrawijaya image
chandrawijayaPublished at   7/20/2022, 9:25:00 AM

Updated at 7/19/2024, 7:04:33 PM

Read  304 times

sh

1                   +--------------+
2                   |              |
3                   |  PostgreSQL  |
4                   |              |
5                   +------+-------+
6                          |
7                          |
8                          |
9          +---------------v------------------+
10          |                                  |
11          |           Kafka Connect          |
12          |  (Debezium, JDBC connectors)     |
13          |                                  |
14          +---------------+------------------+
15                          |
16                          |
17                          |
18                          |
19                  +-------v--------+
20                  |                |
21                  |   PostgreSQL   |
22                  |                |
23                  +----------------+

I'm using Docker environment to do this, however if you want to try it on host machine, you can use Confluent Platform installed on your OS.

yaml

1version: "3.7"
2services:
3  postgres:
4    image: debezium/postgres:13
5    container_name: pg1
6    expose:
7      - 5433
8    ports:
9      - 5433:5433
10    environment:
11      - POSTGRES_USER=mine
12      - POSTGRES_PASSWORD=qwepoi123
13      - POSTGRES_DB=sharingcdc
14      - PGPORT=5433
15
16  postgres-copy:
17    image: debezium/postgres:13
18    container_name: pg2
19    expose:
20      - 5434
21    ports:
22      - 5434:5434
23    environment:
24      - POSTGRES_USER=mine
25      - POSTGRES_PASSWORD=qwepoi123
26      - POSTGRES_DB=sharingcdc-copy
27      - PGPORT=5434
28
29  zookeeper:
30    image: confluentinc/cp-zookeeper:5.5.3
31    container_name: zookeeper
32    environment:
33      ZOOKEEPER_CLIENT_PORT: 2181
34
35  kafka:
36    image: confluentinc/cp-enterprise-kafka:5.5.3
37    container_name: kafka
38    links:
39      - zookeeper 
40    depends_on: [zookeeper]
41    environment:
42      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
43      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
44      KAFKA_BROKER_ID: 1
45      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
46      KAFKA_JMX_PORT: 9991
47    ports:
48      - 9092:9092
49
50  debezium:
51    image: dbz-conn-jdbc-sink
52    container_name: dbz
53    links:
54     - kafka
55     - postgres
56     - postgres-copy
57    environment:
58      BOOTSTRAP_SERVERS: kafka:9092
59      GROUP_ID: 1
60      CONFIG_STORAGE_TOPIC: connect_configs
61      OFFSET_STORAGE_TOPIC: connect_offsets
62      STATUS_STORAGE_TOPIC: connect-status
63      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
64      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
65      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
66      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
67    depends_on: [kafka]
68    ports:
69      - 8083:8083
70
71  schema-registry:
72    image: confluentinc/cp-schema-registry:5.5.3
73    container_name: schema
74    environment:
75      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
76      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
77      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
78    ports:
79      - 8081:8081
80    depends_on: [zookeeper, kafka]
81  
82  kafkacat:
83    image: edenhill/kafkacat:1.5.0
84    container_name: kafkacat
85    entrypoint: 
86      - /bin/sh 
87      - -c 
88      - |
89        while [ 1 -eq 1 ];do sleep 60;done

First, we need the first database to have some data. Open pg1 (as we name it) bash and login:

sh

1psql -U mine -d sharingcdc -W

-W option will prompt us to input password to connect. Once we are in the psql mode, create table:

mysql

1CREATE TABLE public.todo (
2	id uuid NOT NULL,
3	created_date timestamp NULL DEFAULT now(),
4	is_done bool NULL,
5	modified_date timestamp NULL,
6	title varchar(255) NULL,
7	CONSTRAINT todo_pkey PRIMARY KEY (id)
8);

To enable CDC with Postgres, we need to give replication permission to our database:

mysql

1ALTER TABLE public.todo REPLICA IDENTITY FULL;

Let's insert some data in it:

mysql

1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('79fe5ffa-94ed-4871-a5cd-300471586914'::uuid, '2022-05-31 14:47:12.198', false, '2022-06-20 22:52:10.648', 'do laundry');
4INSERT INTO public.todo
5(id, created_date, is_done, modified_date, title)
6VALUES('129eef91-8f55-4edd-9c63-804c6f1a3f5b'::uuid, '2022-05-31 14:59:58.150', false, '2022-06-20 22:52:11.481', 'feed the dog');

Next, we need to establish a connector using Debezium connector to access our first database and stream it to a Kafka topic. Fortunately, it's quite easy to do that. Debezium connector can be accessed through REST API.

There are two connectors we need to create, one is for source, the process when stream are inserted into a topic, and the other one is sink, which taking out data from a topic.

To create source connector, we need to use the PostgresConnector class. Below is an example configs on how to do it.

debezium-source-config.json (for publisher / source connector)

json

1{
2  "name": "sharingcdc-source-connector",
3  "config": {
4    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
5    "plugin.name": "pgoutput",
6    "database.hostname": "postgres",
7    "database.port": "5433",
8    "database.user": "mine",
9    "database.password": "qwepoi123",
10    "database.dbname": "sharingcdc",
11    "database.server.name": "postgres",
12    "database.include.list":"sharingcdc",
13    "tasks.max": 1,
14    "transforms": "route",
15    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
16    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
17    "transforms.route.replacement": "$3"
18  }
19}

Finally, we can just use curl to start creating the connector.

sh

1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-source-config.json"

The response we get will be:

json

1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 09:09:28 GMT
3Location: http://localhost:8083/connectors/sharingcdc-connector
4Content-Type: application/json
5Content-Length: 608
6Server: Jetty(9.4.33.v20201020)
7
8{
9  "name":"sharingcdc-connector",
10  "config":{
11    "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
12    "plugin.name":"pgoutput",
13    "database.hostname":"postgres",
14    "database.port":"5433",
15    "database.user":"mine",
16    "database.password":"qwepoi123",
17    "database.dbname":"sharingcdc",
18    "database.server.name":"postgres",
19    "database.include.list":"sharingcdc",
20    "tasks.max":"1",
21    "transforms":"route",
22    "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
23    "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
24    "transforms.route.replacement":"$3",
25    "name":"sharingcdc-connector"
26  },
27  "tasks":[],
28  "type":"source"
29}

Notice the HTTP status 201 Created tells us the connection is successfully created.

sh

1curl localhost:8083/connectors/sharingcdc-connector/status

This will give result:

json

1{
2  "name":"sharingcdc-connector",
3  "connector":{
4    "state":"RUNNING",
5    "worker_id":"172.21.0.6:8083"
6  },
7  "tasks":[
8    {
9      "id":0,
10      "state":"RUNNING",
11      "worker_id":"172.21.0.6:8083"
12    }
13  ],
14  "type":"source"
15}

If you want to stop our connector, it is important to delete it before starting new connector with the same name.

For example above, we declared that the connector name is sharingcdc-connector, to delete this connector we simply send a DELETE to the API.

sh

1curl -X DELETE localhost:8083/connectors/sharingcdc-connector


To confirm that the connector has been deleted, inquiry the connectors that we have using:

sh

1curl -X GET localhost:8083/connectors


The /connectors endpoint is the root context for Debezium connectors, and sending a GET request without any parameters returns the list of available connectors.

According to the official documentation of Debezium connector for postgres : Postgres Topic Names, the topic name is generated automatically using the server name + schema name + table name.

So in our case, since we defined the database.server.name in the debezium-config.json file as postgres, and our schema name is public and the table is todo, the constructed topic name will be: postgres.public.todo.

That should be it right? Nope!

If you see correctly in our debezium-source-config.json, we defined a transformation route which transforms the predefined generated topic into just table name with $3 value.

json

1    "transforms": "route",
2    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
3    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
4    "transforms.route.replacement": "$3"

So in our case, to name of our topic should be `todo` instead of postgres.public.todo

To check your topic list, Kafka Connect also provide a CLI to check our topics using the kafka-topics.sh file located in the /bin directory.

Open terminal in our Kafka Connect container, and run:

sh

1./bin/kafka-topics.sh --bootstrap-server=kafka:9092 --list
--bootstrap-server param can also be replaced with --zookeeper if you prefer.

Another tool we can use is Kafkacat which I also include in the docker-compose.yml file. To use it, we simply run:

sh

1docker exec kafkacat kafkacat -b kafka:9092 -L # List existing topic in kafka:9092

Now, we need to check whether the source connector is actually able to stream the data changes. There are several ways to do this:

We can use kafka-avro-console-consumer, by running:

sh

1docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo

or if you prefer via bash :

  • Open schema-registry bash:

sh

1docker exec -it <container-name> sh
  • then run

sh

1./usr/bin/kafka-avro-console-consumer -bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo

Or we can also use kafkacat:

sh

1docker exec kafkacat kafkacat -b kafka:9092 -t todo -C

You can change todo with the topic name you want to tail.

Now that we have tailed our connector, we can confirm by trying to modify data, insert new data or even delete data.


Try to insert new data

mysql

1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('6fc341f0-04a8-4c91-92bb-3dbb1ef0b557'::uuid, '2022-06-20 22:11:09.613', false, '2022-06-20 22:52:26.648', 'test');

What we will get in the topic monitoring console

json

1{
2  "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557"
3}   
4{
5  "before":null,
6  "after":{
7    "postgres.public.todo.Value":{
8      "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557",
9      "created_date":{
10        "long":1655763069613000
11      },
12      "is_done":{
13        "boolean":false
14      },
15      "modified_date":{
16        "long":1655765546648000
17      },
18      "title":{
19        "string":"test"
20      }
21    }
22  },
23  "source":{
24    "version":"1.4.2.Final",
25    "connector":"postgresql",
26    "name":"postgres",
27    "ts_ms":1657617691850,
28    "snapshot":{
29      "string":"false"
30    },
31    "db":"sharingcdc",
32    "schema":"public",
33    "table":"todo",
34    "txId":{
35      "long":493
36    },
37    "lsn":{
38      "long":23876504
39    },
40    "xmin":null
41  },
42  "op":"c",
43  "ts_ms":{
44    "long":1657617692088
45  },
46  "transaction":null
47}

sh

1curl localhost:8081/subjects/todo-value/versions/1 | jq ".schema | fromjson"

This REST API will show us the schema as below.

json

1{
2  "type": "record",
3  "name": "Envelope",
4  "namespace": "postgres.public.todo",
5  "fields": [
6    {
7      "name": "before",
8      "type": [
9        "null",
10        {
11          "type": "record",
12          "name": "Value",
13          "fields": [
14            {
15              "name": "id",
16              "type": {
17                "type": "string",
18                "connect.version": 1,
19                "connect.name": "io.debezium.data.Uuid"
20              }
21            },
22            {
23              "name": "created_date",
24              "type": [
25                "null",
26                {
27                  "type": "long",
28                  "connect.version": 1,
29                  "connect.name": "io.debezium.time.MicroTimestamp"
30                }
31              ],
32              "default": null
33            },
34            {
35              "name": "is_done",
36              "type": [
37                "null",
38                "boolean"
39              ],
40              "default": null
41            },
42            {
43              "name": "modified_date",
44              "type": [
45                "null",
46                {
47                  "type": "long",
48                  "connect.version": 1,
49                  "connect.name": "io.debezium.time.MicroTimestamp"
50                }
51              ],
52              "default": null
53            },
54            {
55              "name": "title",
56              "type": [
57                "null",
58                "string"
59              ],
60              "default": null
61            }
62          ],
63          "connect.name": "postgres.public.todo.Value"
64        }
65      ],
66      "default": null
67    },
68    {
69      "name": "after",
70      "type": [
71        "null",
72        "Value"
73      ],
74      "default": null
75    },
76    {
77      "name": "source",
78      "type": {
79        "type": "record",
80        "name": "Source",
81        "namespace": "io.debezium.connector.postgresql",
82        "fields": [
83          {
84            "name": "version",
85            "type": "string"
86          },
87          {
88            "name": "connector",
89            "type": "string"
90          },
91          {
92            "name": "name",
93            "type": "string"
94          },
95          {
96            "name": "ts_ms",
97            "type": "long"
98          },
99          {
100            "name": "snapshot",
101            "type": [
102              {
103                "type": "string",
104                "connect.version": 1,
105                "connect.parameters": {
106                  "allowed": "true,last,false"
107                },
108                "connect.default": "false",
109                "connect.name": "io.debezium.data.Enum"
110              },
111              "null"
112            ],
113            "default": "false"
114          },
115          {
116            "name": "db",
117            "type": "string"
118          },
119          {
120            "name": "schema",
121            "type": "string"
122          },
123          {
124            "name": "table",
125            "type": "string"
126          },
127          {
128            "name": "txId",
129            "type": [
130              "null",
131              "long"
132            ],
133            "default": null
134          },
135          {
136            "name": "lsn",
137            "type": [
138              "null",
139              "long"
140            ],
141            "default": null
142          },
143          {
144            "name": "xmin",
145            "type": [
146              "null",
147              "long"
148            ],
149            "default": null
150          }
151        ],
152        "connect.name": "io.debezium.connector.postgresql.Source"
153      }
154    },
155    {
156      "name": "op",
157      "type": "string"
158    },
159    {
160      "name": "ts_ms",
161      "type": [
162        "null",
163        "long"
164      ],
165      "default": null
166    },
167    {
168      "name": "transaction",
169      "type": [
170        "null",
171        {
172          "type": "record",
173          "name": "ConnectDefault",
174          "namespace": "io.confluent.connect.avro",
175          "fields": [
176            {
177              "name": "id",
178              "type": "string"
179            },
180            {
181              "name": "total_order",
182              "type": "long"
183            },
184            {
185              "name": "data_collection_order",
186              "type": "long"
187            }
188          ]
189        }
190      ],
191      "default": null
192    }
193  ],
194  "connect.name": "postgres.public.todo.Envelope"
195}

Isn't that great?

After we have all the provider config set up, now it's time to consume the message and replicate all the data into another PostgresDB.


Same with creating source connector, we need set configurations to be sent to the connector endpoint to create sink connector. Here, we're using JdbcSinkConnector class which installed manually while creating Docker image.


debezium-sink-config.json (for consumer / sink connector)


json

1{
2  "name": "sharingcdc-sink-connector",
3  "config": {
4    "auto.create": "true",
5    "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
6    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
7    "insert.mode": "upsert",
8    "pk.fields": "id",
9    "pk.mode": "record_key",
10    "delete.enabled": "true",
11    "tasks.max": "1",
12    "topics": "todo",
13    "transforms": "unwrap",
14    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
15    "transforms.unwrap.drop.tombstones": "false"
16  }
17}

Send the payload through curl:

sh

1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-sink-config.json"

It will gives us this:

json

1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 06:43:15 GMT
3Location: http://localhost:8083/connectors/sharingcdc-sink-connector
4Content-Type: application/json
5Content-Length: 539
6Server: Jetty(9.4.43.v20210629)
7
8{
9  "name":"sharingcdc-sink-connector",
10  "config":{
11    "auto.create":"true",
12    "connection.url":"jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
13    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
14    "insert.mode":"upsert",
15    "pk.fields":"id",
16    "pk.mode":"record_value",
17    "tasks.max":"1",
18    "topics":"postgres.public.todo",
19    "transforms":"unwrap",
20    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
21    "transforms.unwrap.drop.tombstones":"false",
22    "name":"sharingcdc-sink-connector"
23  },
24  "tasks":[],
25  "type":"sink"
26}

Now, everything should be running normally. If so, let's try to connect to the second PostgreDB and check if it works 🤞

sh

1docker exec -it pg2 sh #run bash of pg2
2
3psql -U mine -d sharingcdc-copy -W #login to sharingcdc-copy database

After submitting the password, you can run:

sh

1\d --check all relations

We should see that table todo automatically created for us, which we never created it before.

sh

1sharingcdc-copy=# \d
2       List of relations
3 Schema | Name | Type  | Owner
4--------+------+-------+-------
5 public | todo | table | mine
6(1 row)
7
8
9sharingcdc-copy=# select * from todo;
10 is_done |                  id                  |   created_date   |  modified_date   |    title
11---------+--------------------------------------+------------------+------------------+--------------
12 f       | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
13 f       | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
14 f       | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
15(3 rows)

Let's try to insert new data into our first database:

mysql

1sharingcdc=# INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('850c5925-bb49-4795-9c19-4b7a6b4514b5'::uuid, '2022-06-20 22:12:02.335', false, NULL, 'Test12');
4INSERT 0 1

Then confirm it in the second database:

mysql

1sharingcdc-copy=# select * from todo;
2 is_done |                  id                  |   created_date   |  modified_date   |    title
3---------+--------------------------------------+------------------+------------------+--------------
4 f       | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
5 f       | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
6 f       | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
7 f       | 850c5925-bb49-4795-9c19-4b7a6b4514b5 | 1655763122335000 |                  | Test12
8(4 rows)

Voila! 🎉🎉🎉


You could also see the activity logged in the schema registry:

json

1{
2  "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5"
3}   
4{
5  "before":null,
6  "after":{
7    "postgres.public.todo.Value":{
8      "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5",
9      "created_date":{
10        "long":1655763122335000
11      },
12      "is_done":{
13        "boolean":false
14      },
15      "modified_date":null,
16      "title":{
17        "string":"Test12"
18      }
19    }
20  },
21  "source":{
22    "version":"1.4.2.Final",
23    "connector":"postgresql",
24    "name":"postgres",
25    "ts_ms":1657618002239,
26    "snapshot":{
27      "string":"false"
28    },
29    "db":"sharingcdc",
30    "schema":"public",
31    "table":"todo",
32    "txId":{
33      "long":495
34    },
35    "lsn":{
36      "long":23877952
37    },
38    "xmin":null
39  },
40  "op":"c",
41  "ts_ms":{
42    "long":1657618002694
43  },
44  "transaction":null
45}


Source code of this project is also available in my Github repo here.

The latest Kafka connect image does not include JBDC Sink Connector we use. This is indicated with this message when we start the connector using the config above.

json

1HTTP/1.1 500 Internal Server Error
2Date: Mon, 11 Jul 2022 06:44:12 GMT
3Content-Type: application/json
4Content-Length: 4581
5Server: Jetty(9.4.33.v20201020)
6
7{
8  "error_code":500,
9  "message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class <snip> }"
10}

Since it did not come out of the box with the connectors, we need to install it manually. Solving it needs a bit more trick tho.


Solution

Tweaking around I found out that actually official Debezium developer (Jiri Pechanec) provided how to build our own custom Debezium connector with JDBC Sink Connector included.

Source


In that `Dockerfile` example, the connector used is quay.io/debezium/, however I decided to keep using the first tutorial I followed.


Here is my custom Dockerfile:

batchfile

1FROM debezium/connect:1.4
2ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
3
4ARG POSTGRES_VERSION=42.2.8
5ARG KAFKA_JDBC_VERSION=5.3.2
6
7# Deploy PostgreSQL JDBC Driver
8RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar
9
10# Deploy Kafka Connect JDBC
11RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
12curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar


Let's build it:

sh

1docker build -t dbz-conn-jdbc-sink .

Note that dbz-conn-jdbc-sink tag above. It is the image that used in my docker-compose.yml file for debezium service.

If you happen to follow this tutorial, you can build the image yourself before running the docker-compose.yml file.

When creating the sink connector, for couple seconds the status endpoint tells us that it is running. However, I tried checking it after 10 seconds wondering why the data was not inserted into the new database.

json

1{
2  "name":"sharingcdc-sink-connector",
3 "connector":{
4   "state":"RUNNING",
5   "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8   "id":0,
9   "state":"FAILED",
10   "worker_id":"172.29.0.8:8083",
11   "trace":"org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled <snip>"}],
12 "type":"sink"
13}

I found out that I'm using "pk.mode":"record_value" instead of record_key. So just simply change the value with record_key and we're good to go.

json

1{
2  "name":"sharingcdc-sink-connector",
3 "connector":{
4   "state":"RUNNING",
5   "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8   "id":0,
9   "state":"FAILED",
10   "worker_id":"172.29.0.8:8083",
11   "trace":"org.apache.kafka.connect.errors.ConnectException: <snip> Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to postgres:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections. <snip>"
12 }],
13 "type":"sink"
14}

Careful when using localhost in Docker. The term localhost in the host machine and in the Docker are different. In my case, I need to make the connection to the host machine localhost. Defining a static IP would solve the problem, but my IP is changed periodically, thus I need to use host.docker.internal keyword to point to the host localhost.

This keyword also used in my debezium-sink-config.json file:

json

1{
2  ...
3  "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123"
4  ...
5}

Now we have learned how to stream our database changes to Kafka topic and take advantage of it to replicate to another database. More examples are coming for this CDC topic.

Hope this helps! 👋

Youtube 1

Youtube 2

Official Debezium Github Example

Reactions


Comments


More articles

If you enjoyed this article, why not check my other posts?