top of page
Yazarın fotoğrafıMurat Can ÇOBAN

Kafka + Debezium ile CDC(Change Data Capture) (Ubuntu 22.04)

Güncelleme tarihi: 12 Haz 2023

CDC (Change Data Capture) Nedir?


INSERT, DELETE, UPDATE faaliyetlerinin doğrudan veritabanı tarafından izlenmesini ve kayıt altına alınmasını sağlar. Bu özelliğin en önemli avantajı doğrudan log altyapısını kullandığı için bizim oluşturacağımız algoritmalara göre daha performanslı olmasıdır. Ayrıca tüm satırı değil sadece değişikliğin yapıldığı kolonları alması da ergonomik bir yapı sağlamaktadır.


1- Ön Hazırlık Aşaması


Sunucu IP ve hostname bilgileri /etc/hosts dosyasına eklenir.

# ifconfig 
# hostname 
# nano /etc/hosts


Firewall'ı kapatıyoruz.

# ufw disable
# ufw status

Java kurulumunu gerçekleştiriyoruz.

# sudo apt update
# sudo apt install openjdk-11-jdk
# java -version


2- Kafka Bileşenleri Kurulumu


Confluent sitesinden dosyamızı indiriyoruz. Bu dosya içinde kullanacağımız tüm bileşenler bulunmaktadır.

# curl -O https://packages.confluent.io/archive/7.4/confluent-7.4.0.zip

İndirdiğimiz dosyayı zipten çıkarıyoruz ardından kafkanın dosyalarını bir dizin yaratarak bu dizine aktarıyoruz.

# unzip unzip confluent-7.4.0.zip
# mkdir -p /apps/confluent-kafka
# cp -rf confluent-7.4.0/* /apps/confluent-kafka/

Kafka user'ı oluşturarak bunu data4tech grubumuza ekliyoruz. Daha sonrasında kafkanın dosyalarını aktardığımız dizine recursive bir şekilde izinleri tanımlıyoruz.

# useradd -m kafka
# usermod -aG data4tech kafka
# chown -R kafka:data4tech /apps/confluent-kafka


3- Kafka Bileşenlerinin Konfigürasyon Ayarları ve Servis Oluşturulması


a)Zookeeper

Zookeeper konfigürasyonları ile başlıyoruz.

$ nano /apps/confluent-kafka/etc/kafka/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
audit.enable=true
initLimit=5
syncLimit=2
server.1=data4tech:2888:3888

Ardından zookeper için servis oluşturuyoruz.

# nano /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/zookeeper-server-start /apps/confluent-kafka/etc/kafka/zookeeper.properties
ExecStop=/apps/confluent-kafka/bin/zookeeper-server-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

b)Kafka

Kafka konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.

$ nano /apps/confluent-kafka/etc/kafka/server.properties

listeners=PLAINTEXT://data4tech:9092

Kafka servisi oluşturuyoruz.

# nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/kafka-server-start /apps/confluent-kafka/etc/kafka/server.properties
ExecStop=/apps/confluent-kafka/bin/kafka-server-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

c)Kafka Connect

Kafka connect konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.


$ nano /apps/confluent-kafka/etc/kafka/connect-distributed.properties

listeners=HTTP://data4tech:8083
bootstrap.servers=data4tech:9092

Kafka connect servisini oluşturuyoruz.

# /etc/systemd/system/kafka-connect.service
[Unit]
Requires=kafka.service
After=kafka.service

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/connect-distributed /apps/confluent-kafka/etc/kafka/connect-distributed.properties
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

d) Kafka Schema-registry

Kafka schema-registry konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.

$ /apps/confluent-kafka/etc/schema-registry/schema-registry.properties

listeners=http://data4tech:8081
kafkastore.bootstrap.servers=PLAINTEXT://data4tech:9092

Kafka schema-registry servisini oluşturuyoruz.

# nano /etc/systemd/system/kafka-schema-registry.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/schema-registry-start /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
ExecStop=/apps/confluent-kafka/bin/schema-registry-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target


4- Debezium Kafka-Connect Connector Kurulumları


- Debezium PostgreSQL connector indirmek için buraya tıklayınız.

- Debezium Sink JDBC connector indirmek için buraya tıklayınız.


İndirilen dosyaları zipten çıkarıyoruz.

# tar xvf debezium-connector-postgres-2.2.1.Final-plugin.tar.gz
# unzip confluentinc-kafka-connect-jdbc-10.7.2.zip

Zipten çıkarılan dosyaları atacağımız dizini oluşturuyoruz. Zipten çıkarılmış dosyaları buraya gönderiyoruz ve izinlerini veriyoruz.

# mkdir /apps/confluent-kafka/plugins
# mv confluentinc-kafka-connect-jdbc-10.7.2 /apps/confluent-kafka/plugins
# mv debezium-connector-postgres /apps/confluent-kafka/plugins
# chown -R kafka:data4tech /apps/confluent-kafka/plugins

Şimdi bu oluşturduğumuz plugins dizinini Kafka-Connect'in görmesi için gerekli konfigürasyon ayarını girmemiz gerekiyor.

$ /apps/confluent-kafka/etc/kafka/connect-distributed.properties

plugin.path=/usr/share/java,/apps/confluent-kafka/plugins


5- PostgreSQL Kurulumu


Aşağıdaki komutları kullanarak postgresql'i kuruyoruz.

# sudo apt update
# sudo apt install postgresql postgresql-contrib
# sudo systemctl start postgresql.service
# sudo systemctl enable postgresql.service
# sudo -i -u postgres

$ psql
ALTER SYSTEM SET wal_level = logical;

Gerekli konfigürasyon ayarlarını aşağıdaki gibi yapıyoruz. Ayarları tamamladıktan sonra postgresql servisimizi yeniden başlatıyoruz.

$ /etc/postgresql/14/main/postgresql.conf

wal_level= logical
listen_addresses = 'data4tech'


$ /etc/postgresql/14/main/pg_hba.conf

# IPv6 local connections:
host    all             all             all                   md5


# systemctl restart postgresql.service

5- Kadeck Kurulumu

Kadeck yapıcağımız CDC işlemleri için kullanacağımız bir arayüz. Şimdi docker ile kadeck'i başlatıyoruz.

docker run -d -p 80:80 -e xeotek_kadeck_free="data4tech" -e xeotek_kadeck_port=80 xeotek/kadeck:4.2.9

Daha sonrasında tarayıcı üzerinden arayüze giriş yapıyoruz.

http://data4tech:80

Kullanıcı adı ve şifremizi default olarak admin - admin olarak giriyoruz.


Giriş yaptıktan sonra isteğinize göre sol alttaki lamba tuşundan dark mode yapabilirsiniz.

Şimdi "Add Connection" diyerek kafka tarafında başlattığımız servisleri buraya ekleyeceğiz.


Apache Kafka'yı seçerek devam ediyoruz.


Aşağıdaki gibi broker konfigürasyonlarını girerek "Test Connection" butonuna tıklıyoruz.


"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.


Şimdi ise Schema-Registry konfigürasyonlarımızı girerek "Test Connection" yapıyoruz.


"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.


Şimdi ise Kafka Connect konfigürasyonlarımızı girmek için "Add Worker" butonuna tıklıyoruz. "Test Connection" yapıyoruz.


"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.

Tüm bağlantılarımızı tamamladıktan sonra "Create" butonuna tıklayarak bağlantımızı oluşturuyoruz.


Bağlantı ana sayfamıza geldi şimdi ise "Connect" diyerek bağlanıp içine gireceğiz.


Sol taraftan "Kafka Connect" sekmesine geliyoruz.


Burada sağ taraftaki "Add Connection" butonu ile eklediğiniz plugin dosyalarını tanımlayacağız.


Eklediğimiz pluginler buraya gelmiş bulunmaktadır. Şimdi bir source oluşturmak için "CDC Postgres" seçerek ayarlarımızı gireceğiz.


Aşağıda paylaştığım json'ı buraya girerek "Confirm" butonuna tıklıyoruz.


{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.user": "postgres",
  "database.dbname": "postgres",
  "slot.name": "my_slot",
  "tasks.max": "1",
  "database.history.kafka.bootstrap.servers": "data4tech:9092",
  "database.history.kafka.topic": "schema-changes.bigdata",
  "database.server.name": "localhost",
  "database.port": "5432",
  "plugin.name": "pgoutput",
  "topic.prefix": "bigdata",
  "database.hostname": "data4tech",
  "database.password": "test123",
  "name": "debezium_source",
  "table.include.list": "public.source_tb",
  "database.whitelist": "postgres",
  "value.converter.schema.registry.url": "http://data4tech:8081",
  "key.converter.schema.registry.url": "http://data4tech:8081",
  "value.converter.schemas.enable": "true",
  "key.converter.schemas.enable": "true",
  "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields":"op",
  "transforms.unwrap.delete.handling.mode":"rewrite",
  "transforms.unwrap.drop.tombstones":"false"
}

Şimdi bir sink oluşturmak için 2. Sıradaki JDBCSinkConnector'ü ("JDBC") seçerek ayarlarımızı gireceğiz.


Aşağıda paylaştığım json'ı buraya girerek "Confirm" butonuna tıklıyoruz.

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "table.name.format": "public.sink_tb",
  "connection.password": "test123",
  "tasks.max": "1",
  "topics": "bigdata.public.source_tb",
  "key.converter.schemas.enable": "true",
  "delete.enabled": "true",
  "connection.user": "postgres",
  "name": "debezium_sink",
  "value.converter.schemas.enable": "true",
  "auto.create": "true",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connection.url": "jdbc:postgresql://data4tech:5432/postgres",
  "insert.mode": "upsert",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "pk.mode": "record_key",
  "pk.fields": "id",
  "snapshot.mode": "initial",
  "key.converter.schema.registry.url": "http://data4tech:8081",
  "value.converter.schema.registry.url": "http://data4tech:8081"
}

Bu oluşturduğumuz connector'leri ise aşağıdaki gibi görebilirsiniz.




6- CDC

Terminalden tekrar postgreqsl tarafına geçiş yaparak tablolarımızı oluşturacağız. Bir source tablosu oluşturuyoruz.

CREATE TABLE public.source_tb (
	id serial PRIMARY KEY,
	name VARCHAR (50) ,
	surname VARCHAR (50) ,
	job VARCHAR (50)
);

Sink tablomuzu JDBCSinkConnector tarafında auto.create ile oluştur komutu verdiğimiz için manuel oluşturmamıza gerek kalmıyor.



"INSERT" komutu ile başlayalım.

INSERT INTO public.source_tb VALUES(1,'micheal','dan','doctor');
INSERT INTO public.source_tb VALUES(2,'john','oliver','engineer');
INSERT INTO public.source_tb VALUES(3,'sam','curtis','police');

Gördüğünüz gibi source tablosuna attığımız insert komutları sink tablomuza da yazılmış durumda.


"UPDATE" komutu ile devam edelim.


Son olarak "DELETE" komutunu kullanalım.

Bütün komutlarımızı kullanarak source - sink tablolarımızda uyumu gördük.



Bu yaptığımız değişikliklerdeki kafka mesajlarını ise "Data Browser" sekmesindeki topic'lerimize bakıyoruz ve oluşturduğumuz topic'i görüyoruz. İçeriğine girerek bu mesajları görebiliriz.


Bu yazımda Kafka - Debezium ile CDC operasyonlarını anlattım.



Gelecek yazılarımızda görüşmek üzere, sağlıcakla kalın.

505 görüntüleme0 yorum

Son Yazılar

Hepsini Gör

Comments

Rated 0 out of 5 stars.
No ratings yet

Add a rating
bottom of page