CDC (Change Data Capture) Nedir?
INSERT, DELETE, UPDATE faaliyetlerinin doğrudan veritabanı tarafından izlenmesini ve kayıt altına alınmasını sağlar. Bu özelliğin en önemli avantajı doğrudan log altyapısını kullandığı için bizim oluşturacağımız algoritmalara göre daha performanslı olmasıdır. Ayrıca tüm satırı değil sadece değişikliğin yapıldığı kolonları alması da ergonomik bir yapı sağlamaktadır.
1- Ön Hazırlık Aşaması
Sunucu IP ve hostname bilgileri /etc/hosts dosyasına eklenir.
# ifconfig
# hostname
# nano /etc/hosts
Firewall'ı kapatıyoruz.
# ufw disable
# ufw status
Java kurulumunu gerçekleştiriyoruz.
# sudo apt update
# sudo apt install openjdk-11-jdk
# java -version
2- Kafka Bileşenleri Kurulumu
Confluent sitesinden dosyamızı indiriyoruz. Bu dosya içinde kullanacağımız tüm bileşenler bulunmaktadır.
# curl -O https://packages.confluent.io/archive/7.4/confluent-7.4.0.zip
İndirdiğimiz dosyayı zipten çıkarıyoruz ardından kafkanın dosyalarını bir dizin yaratarak bu dizine aktarıyoruz.
# unzip unzip confluent-7.4.0.zip
# mkdir -p /apps/confluent-kafka
# cp -rf confluent-7.4.0/* /apps/confluent-kafka/
Kafka user'ı oluşturarak bunu data4tech grubumuza ekliyoruz. Daha sonrasında kafkanın dosyalarını aktardığımız dizine recursive bir şekilde izinleri tanımlıyoruz.
# useradd -m kafka
# usermod -aG data4tech kafka
# chown -R kafka:data4tech /apps/confluent-kafka
3- Kafka Bileşenlerinin Konfigürasyon Ayarları ve Servis Oluşturulması
a)Zookeeper
Zookeeper konfigürasyonları ile başlıyoruz.
$ nano /apps/confluent-kafka/etc/kafka/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
audit.enable=true
initLimit=5
syncLimit=2
server.1=data4tech:2888:3888
Ardından zookeper için servis oluşturuyoruz.
# nano /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/zookeeper-server-start /apps/confluent-kafka/etc/kafka/zookeeper.properties
ExecStop=/apps/confluent-kafka/bin/zookeeper-server-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
b)Kafka
Kafka konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.
$ nano /apps/confluent-kafka/etc/kafka/server.properties
listeners=PLAINTEXT://data4tech:9092
Kafka servisi oluşturuyoruz.
# nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/kafka-server-start /apps/confluent-kafka/etc/kafka/server.properties
ExecStop=/apps/confluent-kafka/bin/kafka-server-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
c)Kafka Connect
Kafka connect konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.
$ nano /apps/confluent-kafka/etc/kafka/connect-distributed.properties
listeners=HTTP://data4tech:8083
bootstrap.servers=data4tech:9092
Kafka connect servisini oluşturuyoruz.
# /etc/systemd/system/kafka-connect.service
[Unit]
Requires=kafka.service
After=kafka.service
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/connect-distributed /apps/confluent-kafka/etc/kafka/connect-distributed.properties
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
d) Kafka Schema-registry
Kafka schema-registry konfigürasyonlarını yapıyoruz. Aşağıda sadece değişmesi gereken parametreleri yazıyorum. Diğer parametreleri default bırakabilirsiniz. Performans için gerekli araştırmaları yaparak değiştirebilirisiniz.
$ /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
listeners=http://data4tech:8081
kafkastore.bootstrap.servers=PLAINTEXT://data4tech:9092
Kafka schema-registry servisini oluşturuyoruz.
# nano /etc/systemd/system/kafka-schema-registry.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/schema-registry-start /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
ExecStop=/apps/confluent-kafka/bin/schema-registry-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
4- Debezium Kafka-Connect Connector Kurulumları
- Debezium PostgreSQL connector indirmek için buraya tıklayınız.
- Debezium Sink JDBC connector indirmek için buraya tıklayınız.
İndirilen dosyaları zipten çıkarıyoruz.
# tar xvf debezium-connector-postgres-2.2.1.Final-plugin.tar.gz
# unzip confluentinc-kafka-connect-jdbc-10.7.2.zip
Zipten çıkarılan dosyaları atacağımız dizini oluşturuyoruz. Zipten çıkarılmış dosyaları buraya gönderiyoruz ve izinlerini veriyoruz.
# mkdir /apps/confluent-kafka/plugins
# mv confluentinc-kafka-connect-jdbc-10.7.2 /apps/confluent-kafka/plugins
# mv debezium-connector-postgres /apps/confluent-kafka/plugins
# chown -R kafka:data4tech /apps/confluent-kafka/plugins
Şimdi bu oluşturduğumuz plugins dizinini Kafka-Connect'in görmesi için gerekli konfigürasyon ayarını girmemiz gerekiyor.
$ /apps/confluent-kafka/etc/kafka/connect-distributed.properties
plugin.path=/usr/share/java,/apps/confluent-kafka/plugins
5- PostgreSQL Kurulumu
Aşağıdaki komutları kullanarak postgresql'i kuruyoruz.
# sudo apt update
# sudo apt install postgresql postgresql-contrib
# sudo systemctl start postgresql.service
# sudo systemctl enable postgresql.service
# sudo -i -u postgres
$ psql
ALTER SYSTEM SET wal_level = logical;
Gerekli konfigürasyon ayarlarını aşağıdaki gibi yapıyoruz. Ayarları tamamladıktan sonra postgresql servisimizi yeniden başlatıyoruz.
$ /etc/postgresql/14/main/postgresql.conf
wal_level= logical
listen_addresses = 'data4tech'
$ /etc/postgresql/14/main/pg_hba.conf
# IPv6 local connections:
host all all all md5
# systemctl restart postgresql.service
5- Kadeck Kurulumu
Kadeck yapıcağımız CDC işlemleri için kullanacağımız bir arayüz. Şimdi docker ile kadeck'i başlatıyoruz.
docker run -d -p 80:80 -e xeotek_kadeck_free="data4tech" -e xeotek_kadeck_port=80 xeotek/kadeck:4.2.9
Daha sonrasında tarayıcı üzerinden arayüze giriş yapıyoruz.
http://data4tech:80
Kullanıcı adı ve şifremizi default olarak admin - admin olarak giriyoruz.
Giriş yaptıktan sonra isteğinize göre sol alttaki lamba tuşundan dark mode yapabilirsiniz.
Şimdi "Add Connection" diyerek kafka tarafında başlattığımız servisleri buraya ekleyeceğiz.
Apache Kafka'yı seçerek devam ediyoruz.
Aşağıdaki gibi broker konfigürasyonlarını girerek "Test Connection" butonuna tıklıyoruz.
"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.
Şimdi ise Schema-Registry konfigürasyonlarımızı girerek "Test Connection" yapıyoruz.
"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.
Şimdi ise Kafka Connect konfigürasyonlarımızı girmek için "Add Worker" butonuna tıklıyoruz. "Test Connection" yapıyoruz.
"Test Connection" butonuna tıkladıktan sonra başarılı bir şekilde bağlandığımızı görüyoruz.
Tüm bağlantılarımızı tamamladıktan sonra "Create" butonuna tıklayarak bağlantımızı oluşturuyoruz.
Bağlantı ana sayfamıza geldi şimdi ise "Connect" diyerek bağlanıp içine gireceğiz.
Sol taraftan "Kafka Connect" sekmesine geliyoruz.
Burada sağ taraftaki "Add Connection" butonu ile eklediğiniz plugin dosyalarını tanımlayacağız.
Eklediğimiz pluginler buraya gelmiş bulunmaktadır. Şimdi bir source oluşturmak için "CDC Postgres" seçerek ayarlarımızı gireceğiz.
Aşağıda paylaştığım json'ı buraya girerek "Confirm" butonuna tıklıyoruz.
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "postgres",
"database.dbname": "postgres",
"slot.name": "my_slot",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "data4tech:9092",
"database.history.kafka.topic": "schema-changes.bigdata",
"database.server.name": "localhost",
"database.port": "5432",
"plugin.name": "pgoutput",
"topic.prefix": "bigdata",
"database.hostname": "data4tech",
"database.password": "test123",
"name": "debezium_source",
"table.include.list": "public.source_tb",
"database.whitelist": "postgres",
"value.converter.schema.registry.url": "http://data4tech:8081",
"key.converter.schema.registry.url": "http://data4tech:8081",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true",
"transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.drop.tombstones":"false"
}
Şimdi bir sink oluşturmak için 2. Sıradaki JDBCSinkConnector'ü ("JDBC") seçerek ayarlarımızı gireceğiz.
Aşağıda paylaştığım json'ı buraya girerek "Confirm" butonuna tıklıyoruz.
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format": "public.sink_tb",
"connection.password": "test123",
"tasks.max": "1",
"topics": "bigdata.public.source_tb",
"key.converter.schemas.enable": "true",
"delete.enabled": "true",
"connection.user": "postgres",
"name": "debezium_sink",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://data4tech:5432/postgres",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"pk.mode": "record_key",
"pk.fields": "id",
"snapshot.mode": "initial",
"key.converter.schema.registry.url": "http://data4tech:8081",
"value.converter.schema.registry.url": "http://data4tech:8081"
}
Bu oluşturduğumuz connector'leri ise aşağıdaki gibi görebilirsiniz.
6- CDC
Terminalden tekrar postgreqsl tarafına geçiş yaparak tablolarımızı oluşturacağız. Bir source tablosu oluşturuyoruz.
CREATE TABLE public.source_tb (
id serial PRIMARY KEY,
name VARCHAR (50) ,
surname VARCHAR (50) ,
job VARCHAR (50)
);
Sink tablomuzu JDBCSinkConnector tarafında auto.create ile oluştur komutu verdiğimiz için manuel oluşturmamıza gerek kalmıyor.
"INSERT" komutu ile başlayalım.
INSERT INTO public.source_tb VALUES(1,'micheal','dan','doctor');
INSERT INTO public.source_tb VALUES(2,'john','oliver','engineer');
INSERT INTO public.source_tb VALUES(3,'sam','curtis','police');
Gördüğünüz gibi source tablosuna attığımız insert komutları sink tablomuza da yazılmış durumda.
"UPDATE" komutu ile devam edelim.
Son olarak "DELETE" komutunu kullanalım.
Bütün komutlarımızı kullanarak source - sink tablolarımızda uyumu gördük.
Bu yaptığımız değişikliklerdeki kafka mesajlarını ise "Data Browser" sekmesindeki topic'lerimize bakıyoruz ve oluşturduğumuz topic'i görüyoruz. İçeriğine girerek bu mesajları görebiliriz.
Bu yazımda Kafka - Debezium ile CDC operasyonlarını anlattım.
Gelecek yazılarımızda görüşmek üzere, sağlıcakla kalın.
Comments