星期二, 10月 15, 2019

KafKa connect

 當你需要ETL 時  你會用啥工具
 其實找了很多東西  有些蠻好的 有 fluentd , oracle OGG ,  canal 都不錯的 但有一種算簡單 也方便的方式  目前還在測試中  就是 KafKa connect

不過基本的串接已經可以達成 主要用 new PK + timestamp 的方式 做 資料抽取

當Table有新資料時,最簡單的方式,就是 select PKey last 大於的方試,可以找出來.那如果有異動的 update 其實就可以用 timestamp last 大於的方式,也可以找出來.差別就是 你同步的最後一筆的資訊能作為撈取的區間為何.基本上也是要排序!!
  SELECT * FROM "TEST_USER" WHERE
  "TEST_USER"."MODIFIED" < ?  AND (("TEST_USER"."MODIFIED" = ?  AND "TEST_USER"."ID" > ?)
  OR "TEST_USER"."MODIFIED" > ?)  ORDER BY "TEST_USER"."MODIFIED","TEST_USER"."ID" ASC.


這是 KafKa connect 下的條件 其實就是不停的  Query然後抓到的 Data 放進 Kafa Queue 裡但這也只是一半 還要處理 寫入 必須去重   說方便 但也不是非常聰明 ,至少能做到 insert update 的同步



建立 Kafka Connect sink 寫入  請使用5.3版 db.timezone 很重要的

curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "jdbc-sink",
    "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:oracle:thin:@192.168.250.233:1521/dbcenter",
            "connection.user": "xxx",
            "connection.password": "xxx",
            "topics": "TEST_USER",
            "name": "jdbc-sink",
            "insert.mode":"upsert",
            "db.timezone":"Asia/Shanghai",
            "auto.create": "true",
            "auto.evolve":"true",
            "pk.fields":"ID",
            "pk.mode":"record_value"
            }
    }'

從 topics
往目標DB 丟資訊 使用 upsert
對Oracle 剛好是merge 的方式


建立 Kafka Connect Source 讀取

curl -X POST http://192.168.250.35:8083/connectors -H "Content-Type: application/json" -d '{
            "name": "jdbc_source_oracle_01",
            "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "connection.url": "jdbc:oracle:thin:@192.168.0.230:1521/dbcenter",
                    "connection.user": "xxx",
                    "connection.password": "xxx",
                    "table.types":"TABLE",
                    "topic.prefix": "",
                    "table.whitelist" : "TEST_USER",
                    "mode":"timestamp",
                    "timestamp.column.name":"MODIFIED",
                    "poll.interval.ms" : 5000,
                    "db.timezone":"Asia/Shanghai",
                    "validate.non.null": false
                    }
            }'

以上配置就能 用  ID and timestamp 做基本的 抽取備份 這範例是 Oracle to Oracle使用 JdbcSourceConnector
記得要裝 ojdbc.jar 放在share 的資料夾就可以了

https://docs.confluent.io/current/connect/references/allconfigs.html 配置細節

更多 connector 配置細節
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html

https://docs.confluent.io/current/connect/index.html

https://docs.confluent.io/current/connect/managing/monitoring.html





玩弄Kafka

如果你只是想拿來當 Queue   這個功能就過之了

也可能沒這樣好用 裝個rabbitmq 可能會很好 還有比較好的介面使用


用conflunt 裝好 也啟動之後 就來 弄個 consumer吧

/bin 底下 有很多 執行 shell 可以使用


 ./kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic TEST

--from-beginning  會從 0 開始 沒有offset

但是這樣會從 Topic Test裡面 拿出從古至今所有的 Data
如果不需要 就拿掉這段 就是一般的 先進先出 Queue


弄個 Producer吧

./kafka-console-producer --broker-list localhost:9092 --topic test

Standard Input 輸入幾個 Msg
另一編的 consumer 就收到訊息了
簡易的  KafKa 就 完成了

confluent kafka 速記

最近在玩Kafka

confluent 整合kafka 的生態系 包含zookeeper ksql connect 


使用套件  confluent  安裝 版本  5.3.1  從 https://www.confluent.io 下載之後 zip 檔案之後 解開

第一次使用時 用的是 5.1 有幾個 connector 的 bug 5.3 就修正了 這邊直接用 5.3.1 的方式 紀錄一下

先幫 confluent/bin 加入   環境變數



vi /etc/profile
export PATH=/root/confluent-5.3.1/bin:$PATH
source /etc/profile



然後 5.3 之後需要 下載 cli 使用

curl -L https://cnfl.io/cli | sh -s -- -b /<path-to-directory>/bin

https://docs.confluent.io/current/cli/installing.html

放到  /bin 後  就可以用了


confluent local start  就可以啟動  kafka  然後進行測試
如果需要 connect 

control-center.properties   


confluent.controlcenter.connect.cluster=http://localhost:8083

confluent.controlcenter.connect.connect-default.cluster=http://localhost:8083


這需要打開  

connect-default 是local 預設的  配置項


基本上就 搞定了  再來就可以去玩 kafka 




下篇會 寫點 connect 小小的紀錄



星期日, 1月 06, 2019

webpack memo

npm cache clean --force

npm install
npm run prod:d
npm run prod:m



editbin.exe /NXCOMPAT:NO c:\project\bin\Release\WindowsFormsApplication.exe