Change Data Capture using Chango CDC

Kidong Lee
3 min readDec 1, 2023

Incremental Processing is common way to move data between databases / data sources in data engineering. But because incremental processing is done as in batch mode for most cases, we want to use CDC(Change Data Capture) if possible.

Chango CDC is a CDC application to capture CDC data from databases and send CDC data to Chango. Chango CDC is using embedded Debezium internally. Let’s get started with PostgreSQL CDC using Chango CDC .

Install Chango CDC

Download pre-built Chango CDC .

curl -L -O https://github.com/cloudcheflabs/chango-cdc/releases/download/1.0.0/chango-cdc-1.0.0-debezium-1.9.7.Final-linux-x64.tar.gz

Untar and move to Chango CDC directory.

tar zxvf chango-cdc-1.0.0-debezium-1.9.7.Final-linux-x64.tar.gz
cd chango-cdc-1.0.0-debezium-1.9.7.Final-linux-x64/

Configure Chango CDC

Modify conf/configuation.yml. For example, it looks like this.

chango:
token: eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJjZGM0MTFkYzIzMzg2NjU0ZGZkYTdjYjk4OTMzNjA1NWNiNyIsImV4cCI6MTcwNjY1OTE5OSwiaWF0IjoxNzAxMzU2NDEyfQ.-WjO6mpNV5QM5t1jwLmBD8tBuRNOxrzcREU6RqLJtHGD0u_TGi28NWG9lFYA-ZKQ-nDwGbr6Nf_MXaUeeO2VAw
dataApiUrl: http://chango-private-1.chango.private:80
schema: cdc_db
table: student
batchSize: 10000
interval: 1000

debezium:
connector: |-
name=postgres-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/chango-cdc/offset-student.dat
offset.flush.interval.ms=60000
topic.prefix=cdc
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=/tmp/chango-cdc/schemahistory-student.dat
database.server.name=postgresql-server
database.server.id=pg-1
database.hostname=localhost
database.port=5432
database.user=anyuser
database.password=anypassword
database.dbname=studentdb
table.include.list=public.student
  • chango.token: Chango Credential which is necessary to access Chango Data API. See Get Chango Credential.
  • chango.dataApiUrl: Chango Data API endpoint URL.
  • chango.schema: Schema of Iceberg catalog in Chango.
  • chango.table: Iceberg Table in Chango.

In this example, we will use PostgreSQL from which CDC data will be caught and sent to Chango.

  • debezium.connector: Properties of Debezium connector for PostgreSQL.

If you want to use another database, consult Debezium Connectors.

Install PostgreSQL

PostgreSQL will be installed as docker container.

Assumed that docker and docker compose are installed on your machine, create docker compose file.

cat <<EOF > docker-compose.yml
version: "3.5"

services:
postgres:
container_name: postgres
image: debezium/postgres:9.6
ports:
- 5432:5432
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=anyuser
- POSTGRES_PASSWORD=anypassword
EOF

Run docker compose.

docker-compose up -d;

And then, enter the docker container of PostgreSQL to create a table.

docker exec -it postgres sh;

Connect PostgreSQL database in it.

psql -U anyuser -d studentdb;

Create table student.

CREATE TABLE public.student
(
id integer NOT NULL,
address character varying(255),
email character varying(255),
name character varying(255),
CONSTRAINT student_pkey PRIMARY KEY (id)
);

Create Iceberg Table

You need to create Iceberg table in Chango using trino clients like Superset.

-- create iceberg schema.
CREATE SCHEMA IF NOT EXISTS iceberg.cdc_db;


-- create iceberg table.
CREATE TABLE iceberg.cdc_db.student (
address varchar,
day varchar,
email varchar,
id bigint,
month varchar,
name varchar,
op varchar,
ts bigint,
year varchar
)
WITH (
partitioning=ARRAY['year', 'month', 'day'],
format = 'PARQUET'
);

In addition to the original fields of PostgreSQL table, fields year, month, day, ts and op are required for partitioning and small files compaction. If fields year, month, day, ts and op exist in the original PostgreSQL table, then _ will be appended to the original fields of PostgreSQL table.

NOTE: The sequence of table column names in lower case must be alphanumeric in ascending order.

Run Chango CDC

Move to Chango CDC directory, and run Chango CDC.

bin/start-chango-cdc.sh

You can check the log file /tmp/chango-cdc/chango-cdc.log.

If you want to stop Chango CDC, run the following.

bin/stop-chango-cdc.sh

Run PostgreSQL CUD Queries

In PostgreSQL docker container, run the CUD queries.

INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Kidong Lee','Seoul','kidong@example.com');

UPDATE STUDENT SET EMAIL='kidong2@example.com', NAME='Kidong2 Lee' WHERE ID = 1;

DELETE FROM STUDENT WHERE ID = 1;

Let’s check if CDC data has been saved in Iceberg table in Chango with running the following query in Superset.

select *, from_unixtime(ts/1000) from iceberg.cdc_db.student order by ts desc;

As seen for now, Chango CDC don’t need such as Kafka and Kafka Connect Cluster to accomplish CDC.

Visit Chango.

--

--