其實找了很多東西 有些蠻好的 有 fluentd , oracle OGG , canal 都不錯的 但有一種算簡單 也方便的方式 目前還在測試中 就是 KafKa connect
不過基本的串接已經可以達成 主要用 new PK + timestamp 的方式 做 資料抽取
當Table有新資料時,最簡單的方式,就是 select PKey last 大於的方試,可以找出來.那如果有異動的 update 其實就可以用 timestamp last 大於的方式,也可以找出來.差別就是 你同步的最後一筆的資訊能作為撈取的區間為何.基本上也是要排序!!
SELECT * FROM "TEST_USER" WHERE
"TEST_USER"."MODIFIED" < ? AND (("TEST_USER"."MODIFIED" = ? AND "TEST_USER"."ID" > ?)
OR "TEST_USER"."MODIFIED" > ?) ORDER BY "TEST_USER"."MODIFIED","TEST_USER"."ID" ASC.
這是 KafKa connect 下的條件 其實就是不停的 Query然後抓到的 Data 放進 Kafa Queue 裡但這也只是一半 還要處理 寫入 必須去重 說方便 但也不是非常聰明 ,至少能做到 insert update 的同步
curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@192.168.250.233:1521/dbcenter",
"connection.user": "xxx",
"connection.password": "xxx",
"topics": "TEST_USER",
"name": "jdbc-sink",
"insert.mode":"upsert",
"db.timezone":"Asia/Shanghai",
"auto.create": "true",
"auto.evolve":"true",
"pk.fields":"ID",
"pk.mode":"record_value"
}
}'
從 topics
往目標DB 丟資訊 使用 upsert
對Oracle 剛好是merge 的方式
建立 Kafka Connect Source 讀取
curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@192.168.0.230:1521/dbcenter",
"connection.user": "xxx",
"connection.password": "xxx",
"table.types":"TABLE",
"topic.prefix": "",
"table.whitelist" : "TEST_USER",
"mode":"timestamp",
"timestamp.column.name":"MODIFIED",
"poll.interval.ms" : 5000,
"db.timezone":"Asia/Shanghai",
"validate.non.null": false
}
}'
以上配置就能 用 ID and timestamp 做基本的 抽取備份 這範例是 Oracle to Oracle使用 JdbcSourceConnector
記得要裝 ojdbc.jar 放在share 的資料夾就可以了
https://docs.confluent.io/current/connect/references/allconfigs.html 配置細節
更多 connector 配置細節
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html
https://docs.confluent.io/current/connect/index.html
https://docs.confluent.io/current/connect/managing/monitoring.html
沒有留言:
張貼留言