Re:Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
Hi, 文档[1] 记录的是类似mysql (cancel json) -> kafka -> flink -> other db 的行为,主要还是侧重于flink 读canal format。 中间的转换需要自己实现下,可以在udf中通过open方法连一下mongodb拿一下,因为目前udf是感知不到catalog。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format -- Best! Xuyang 在 2022-08-23 08:55:44,"casel.chen" 写道: >数据流图是 mongodb --> flink cdc --> kafka (canal json) >看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal >json格式的消息,中间的格式转换得自己实现是么? >但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢? > > >另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal >json数据[3]。这是已知的issue么? > > >CREATETABLE >mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers'); >CREATETABLE >kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092', >'format'='canal-json'); >INSERT INTO kafka_customers SELECT * FROM mongo_customers; > > >[1] >{ >"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": >true}", >"operationType": "insert", >"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, >\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": >\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", >\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", >\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", >\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": >\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", >\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": >\"84552350\", \"transMajorCategory\": \"123\", >\"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", >\"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", >\"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": >\"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", >\"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", >\"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", >\"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": >123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, >\"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", >\"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", >\"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", >\"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", >\"recordVersion\": \"123\", \"sign\": \"123\"}", >"source": { >"ts_ms": 0, >"snapshot": "true" >}, >"ns": { >"db": "amp_test", >"coll": "TopTransOrder" >}, >"to": null, >"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}", >"updateDescription": null, >"clusterTime": null, >"txnNumber": null, >"lsid": null >} > > > > >[2]
Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
数据流图是 mongodb --> flink cdc --> kafka (canal json) 看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal json格式的消息,中间的格式转换得自己实现是么? 但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢? 另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal json数据[3]。这是已知的issue么? CREATETABLE mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers'); CREATETABLE kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092', 'format'='canal-json'); INSERT INTO kafka_customers SELECT * FROM mongo_customers; [1] { "_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": true}", "operationType": "insert", "fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": \"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", \"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", \"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", \"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": \"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", \"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": \"84552350\", \"transMajorCategory\": \"123\", \"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", \"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", \"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": \"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", \"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", \"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", \"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": 123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, \"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", \"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", \"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", \"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", \"recordVersion\": \"123\", \"sign\": \"123\"}", "source": { "ts_ms": 0, "snapshot": "true" }, "ns": { "db": "amp_test", "coll": "TopTransOrder" }, "to": null, "documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}", "updateDescription": null, "clusterTime": null, "txnNumber": null, "lsid": null } [2]
Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
Hi, 请问你的需求是 “debezium数据”- flink -“canal ”么? 如果是这样的话,可以用UDF[1]来尝试下。[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ 在 2022-08-21 10:49:29,"casel.chen" 写道: >flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka? >flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal >json格式输出呢?有没有例子或关键代码展示?谢谢!