星期二, 10月 15, 2019

KafKa connect

 當你需要ETL 時  你會用啥工具
 其實找了很多東西  有些蠻好的 有 fluentd , oracle OGG ,  canal 都不錯的 但有一種算簡單 也方便的方式  目前還在測試中  就是 KafKa connect

不過基本的串接已經可以達成 主要用 new PK + timestamp 的方式 做 資料抽取

當Table有新資料時,最簡單的方式,就是 select PKey last 大於的方試,可以找出來.那如果有異動的 update 其實就可以用 timestamp last 大於的方式,也可以找出來.差別就是 你同步的最後一筆的資訊能作為撈取的區間為何.基本上也是要排序!!
  SELECT * FROM "TEST_USER" WHERE
  "TEST_USER"."MODIFIED" < ?  AND (("TEST_USER"."MODIFIED" = ?  AND "TEST_USER"."ID" > ?)
  OR "TEST_USER"."MODIFIED" > ?)  ORDER BY "TEST_USER"."MODIFIED","TEST_USER"."ID" ASC.


這是 KafKa connect 下的條件 其實就是不停的  Query然後抓到的 Data 放進 Kafa Queue 裡但這也只是一半 還要處理 寫入 必須去重   說方便 但也不是非常聰明 ,至少能做到 insert update 的同步



建立 Kafka Connect sink 寫入  請使用5.3版 db.timezone 很重要的

curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "jdbc-sink",
    "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:oracle:thin:@192.168.250.233:1521/dbcenter",
            "connection.user": "xxx",
            "connection.password": "xxx",
            "topics": "TEST_USER",
            "name": "jdbc-sink",
            "insert.mode":"upsert",
            "db.timezone":"Asia/Shanghai",
            "auto.create": "true",
            "auto.evolve":"true",
            "pk.fields":"ID",
            "pk.mode":"record_value"
            }
    }'

從 topics
往目標DB 丟資訊 使用 upsert
對Oracle 剛好是merge 的方式


建立 Kafka Connect Source 讀取

curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
            "name": "jdbc_source_oracle_01",
            "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "connection.url": "jdbc:oracle:thin:@192.168.0.230:1521/dbcenter",
                    "connection.user": "xxx",
                    "connection.password": "xxx",
                    "table.types":"TABLE",
                    "topic.prefix": "",
                    "table.whitelist" : "TEST_USER",
                    "mode":"timestamp",
                    "timestamp.column.name":"MODIFIED",
                    "poll.interval.ms" : 5000,
                    "db.timezone":"Asia/Shanghai",
                    "validate.non.null": false
                    }
            }'

以上配置就能 用  ID and timestamp 做基本的 抽取備份 這範例是 Oracle to Oracle使用 JdbcSourceConnector
記得要裝 ojdbc.jar 放在share 的資料夾就可以了

https://docs.confluent.io/current/connect/references/allconfigs.html 配置細節

更多 connector 配置細節
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html

https://docs.confluent.io/current/connect/index.html

https://docs.confluent.io/current/connect/managing/monitoring.html





沒有留言: