
Bu yazımda Docker ortamında PostgreSQL, Debezium, Kafka ve Elasticsearch kurulumu ve basit bir beraber kullanım örneğini paylaşacağım. PostgreSQL’den ElasticSearch’e Gerçek Zamanlı Veri Senkronizasyonu
Buradaki amacımız PostgreSQL veri tabanında yapısal verilerimizden ihtiyacımız olan kısmını yani Full Text Search yapmak istediğimiz tabloları yada kolonları Elasticsearch platformuna çıkaracağız. Debezium ve Kafkayı da anlık PostgreSQL veri tabanında değişen veriyi yakalayıp(CDC — Change Data Capture) Elasticsearch platformuna aktarmak için kullanacağız. Fakat şunu belirtmek istiyorum bu yazımızda bir kullanım örneği göreceğiz detaylı konfigürasyon ayar vb. yapmayacağız sonuçta PRODUCTION ortam kurmuyoruz o yüzden en basit şekilde yazılımlarımızı docker da ayağa kaldıracağız. Temennim mevcut yada geliştirmeyi düşündüğünüz projelerinizde sizlere bir fikir verir. Hadi basit bir PostgreSQL to Elasticsearch CDC operasyonu yapalım.
Docker Ortamında Platformların Ayağa Kaldırılması
İlk aşama olarak Docker ortamında servislerin ayağa kaldırılması adımlarını atlıyorum çünkü bu adımları kendi GitHub reposumda açıklayıcı bir şekilde anlattım. Aşağıdaki GitHub linki ile dökümana gidip Docker ortamında servisleri ayağa kaldırmanız gerekiyor.
git clone https://github.com/atfatmc/CDC-PostgreSQL-to-Elasticsearch
- Postgresql
– Şanlı şerefli PostgreSQL Veri tabanı hizmeti sağlar ve genellikle uygulamaların veri depolama ihtiyaçlarını karşılar. - Elasticsearch
– Elasticsearch, bilindiği gibi arama ve veri analizi için kullanılan açık kaynaklı arama motoru ve veri analitiği platformu, arama, filtreleme ve analizler için hızlı ve profesyoneldir. - Kafka
– Kafka, gerçek zamanlı veri akış platformudur. Yüksek hacimli veri akışlarını işlemek, depolamak ve yönetmek için kullanılır. Veri akışlarını üretici-tüketici modeline göre yönetir ve ölçeklenebilir bir altyapı sağlar. - Zookeeper
– Zookeeper, dağıtık sistemlerin eşgüdümünü sağlayan bir hizmet protokolü sunucusudur. Kafka gibi dağıtık sistemlerin yönetimi ve koordinasyonu için kritik bir bileşendir. - Debezium
– Debezium, veritabanındaki değişiklikleri yakalayarak veri akışlarını oluşturan bir açık kaynaklı CDC (Change Data Capture) platformudur. Farklı veri tabanı sistemlerinden (PostgreSQL, MySQL, MongoDB vb.) gerçek zamanlı veri değişimlerini yakalamak için kullanılır. - Kowl
– Kowl, Kafka sistemlerinin yönetimi, izlenmesi ve görselleştirilmesi için kullanılan bir web tabanlı kullanıcı arayüzüdür.

Uygulama Arayüzlerinin Kontrolü

http://localhost:8080 (Kowl Kafka dashboard) OK

Dbeaver PostgreSQL bağlantısı OK
Connector oluşturmak için start.sh dosyasını kullanabilirsiniz. Ayrıca elle ayarlamak isterseniz Postman ve VS Code extension kullanabilirsiniz.
Postman üzerinden Debezium API’lerini kullanarak PostgreSQL’e bir Source Connector oluşturalım

{
"name": "cdc_db-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"topic.prefix": "pg",
"snapshot.mode": "never",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "cdc_db",
"database.server.name": "cdc1",
"database.whitelist": "cdc_db",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "schema-changes.test_db",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "10",
"topic.creation.default.cleanup.policy": "compact"
}
}
Connector nedir ne işe yarar nasıl çalışır?
Source Connector:
Debezium’un veri değişikliklerini yakaladığı kaynak sistemin bağlantısını sağlar.
PostgreSQL, MySQL, MongoDB gibi veritabanlarına bağlanarak, bu veritabanlarındaki değişiklikleri izler ve yakalamak için kullanılır.
Veritabanı loglama mekanizmalarını (örneğin PostgreSQL’de Logical Replication) kullanarak, gerçek zamanlı veri değişikliklerini yakalar.
Yakalanan değişiklikler, Kafka veri akışına iletilir.
Hemen bir Checklist oluşturalım
- Kowl bağlantımız OK
- PostgreSQL veri tabanı bağlantısı OK
- Postgresql Connector oluşturma OK
Elasticsearch Bağlantısı
Docker da ayağa kaldırdığımız Elasticsearch’e bağlantı yapıp indexleri, veriyi görmemiz ayrıca REST servisini kullanmamız lazım aslında bu hizmet için genel olarak ve profesyonel olan Kibana
kullanılıyor fakat nihayetinde bu yazımızda prod ortam kurmuyoruz dolayısıyla bize işimizi görecek bir eklenti yeter. Google Chrome kullanıyorsanız “Elasticvue” eklentisi ile ihtiyacımız olan ekranlara
erişebileceğiz ayrıca REST servislerini kullanabileceğiz.

ADD ELASTICSEARCH CLUSTER butonu ile docker da kaldırdığımız Elasticsearch clusterımızı bağlayacağız.

Cluster Name ve Url bilgilerini düzenleyip CONNECT butonu ile bağlantımızı yapıyoruz.

Menüde REST sekmesinden Elasticsearch clusterımızda oluşturmak istediğimiz index ismiyle ve PUT methodunu kullanarak mapping oluşturuyoruz. tabi ki bu oluşturduğumuz mapping de aşağıda bahsettiğim tüm özellikleri tanımlamadık basit tanımlar yaptık. PostgreSQL’den Elasticsearch’e CDC yaparken, PostgreSQL’deki tabloların yapısına uygun olarak Elasticsearch’te doğru mapping tanımlarının yapılması önemli dikkat edelim.
- Neden Mapping tanımlıyoruz Mapping ne işe yarar?
— Elasticsearch te Mapping tanımlarken aslında yapmış olduğumuz aksiyonlar arasında indexdeki field veri tiplerini tanımlıyoruz
— Analiz ayarlamaları yapmamıza imkan veriyor.
— Arama ve Filtreleme özellikleri tanımlayabiliyoruz.

Elasticsearch ilk indeximiz 1 Shard 0 Replica olacak şekilde oluştu. Shard ve Replica kavramları yazımız konusu olmadığından yazmayacağım.
Postman üzerinden Debezium API’lerini kullanarak bir Sink Connector oluşturalım
Elasticsearch veriyi yazacağımız indeximiz hazır ve artık Sink Connector oluşturabiliriz. Postman üzerinden Debezium API’lerini kullanarak bir Sink Connector oluşturalım.

{
"name": "es-sink-comments",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "comments",
"connection.url": "http://elasticsearch:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "_doc",
"behavior.on.null.values": "delete"
}
}
Final şimdi test zamanı
UPDATE "comments"
SET "comment" = 'Bu bir deneme yorumudur. Amacımız bu alanın Elasticsearch de full text search yapılmasını istiyorum. 1919'
WHERE id=19
Dbeaver üzerinden yaptığımız bağlantıyı kullanarak update sql’imizi gönderelim.
Kowl kontrol edelim yaptığımız değişiklik Kafka ya düştü mü?

Elasticvue kontrol edelim yaptığımız değişiklikle beraber veri diğer kolonlarıyla beraber Elasticsearch e düştü mü?

UPDATE "comments"
SET "comment" = 'Bu bir deneme yorumudur. Amacımız bu alanın Elasticsearch de full text search yapılmasını istiyorum. 1910'
WHERE id=10
UPDATE "comments"
SET "comment" = 'Bu bir deneme yorumudur. Amacımız bu alanın Elasticsearch de full text search yapılmasını istiyorum. 191919'
WHERE id=19

Son 2 update SQL de başarılı bir şekilde Önce Kafka ya onun üzerinden de Elasticsearch’e aktarıldı.
NOT: PostgreSQL comment tablosundaki tüm veriyi Elasticsearch e aktardığınızı varsayarak CDC işlemini gerçekleştirdik.
NOT: Bu yazımızda yapmaya çalıştığımız şey PostgreSQL veri tabanında bulunan yapısal ilişkisel verimizden ister tamamı ister belirli bir kısmını Elasticsearch’e gerçek zamanlı olarak aktarıp mevcut yada gerçekleştirmeyi düşündüğümüz bir projede ilgili alandan Full Text Search yapmak oldu.
NOT: PostgreSQL’den ElasticSearch’e Veri Senkronizasyonu yukarıdaki tüm aşamaları başarıyla yaptık ama bazı kavramlar boşlukta kaldı örn: shard, replica, sink connector belirttiğim gibi Production ortam kurmadık amacımız kullanım örneği görmek umarım mevcut yada gerçekleştirmeyi düşündüğünüz projelerinizde fikir verir.
Elasticvue Google Chrome Extension için Tıklayın.
Diğer Docker yazılarımız için Tıklayın.
Leave a Comment