Re:Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 Xuyang
Hi, 文档[1] 记录的是类似mysql (cancel json) -> kafka -> flink -> other db 
的行为,主要还是侧重于flink 读canal format。


中间的转换需要自己实现下,可以在udf中通过open方法连一下mongodb拿一下,因为目前udf是感知不到catalog。





[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format




--

Best!
Xuyang





在 2022-08-23 08:55:44,"casel.chen"  写道:
>数据流图是 mongodb --> flink cdc --> kafka (canal json)
>看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal 
>json格式的消息,中间的格式转换得自己实现是么?
>但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?
>
>
>另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal 
>json数据[3]。这是已知的issue么?
>
>
>CREATETABLE 
>mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
>CREATETABLE 
>kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
>'format'='canal-json');
>INSERT INTO kafka_customers SELECT * FROM mongo_customers;
>
>
>[1]
>{
>"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": 
>true}",
>"operationType": "insert",
>"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, 
>\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": 
>\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", 
>\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", 
>\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", 
>\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": 
>\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", 
>\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": 
>\"84552350\", \"transMajorCategory\": \"123\", 
>\"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", 
>\"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", 
>\"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": 
>\"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", 
>\"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", 
>\"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", 
>\"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": 
>123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, 
>\"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", 
>\"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", 
>\"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", 
>\"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", 
>\"recordVersion\": \"123\", \"sign\": \"123\"}",
>"source": {
>"ts_ms": 0,
>"snapshot": "true"
>},
>"ns": {
>"db": "amp_test",
>"coll": "TopTransOrder"
>},
>"to": null,
>"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
>"updateDescription": null,
>"clusterTime": null,
>"txnNumber": null,
>"lsid": null
>}
>
>
>
>
>[2]

Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 casel.chen
数据流图是 mongodb --> flink cdc --> kafka (canal json)
看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal 
json格式的消息,中间的格式转换得自己实现是么?
但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?


另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal 
json数据[3]。这是已知的issue么?


CREATETABLE 
mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
CREATETABLE 
kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
'format'='canal-json');
INSERT INTO kafka_customers SELECT * FROM mongo_customers;


[1]
{
"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": 
true}",
"operationType": "insert",
"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, 
\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": 
\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", 
\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", 
\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", 
\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": 
\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", 
\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": 
\"84552350\", \"transMajorCategory\": \"123\", \"consoleActualPayChannel\": 
\"123\", \"consolePayType\": \"123\", \"consolePreAuthFlag\": \"123\", 
\"consoleSubsidyFlag\": \"123\", \"consoleDcType\": \"123\", \"consoleIsFq\": 
\"123\", \"consoleAcctDivFlag\": \"123\", \"actualPayChannel\": \"123\", 
\"payChannel\": \"123\", \"transType\": \"123\", \"payType\": \"123\", 
\"dcType\": \"123\", \"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", 
\"creditType\": \"123\", \"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 
123.0, \"actOrdAmt\": 123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, 
\"refFeeAmt\": 123.0, \"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, 
\"payCardId\": \"123\", \"feeRecType\": \"123\", \"feeFlag\": \"123\", 
\"transStat\": \"S\", \"createTime\": {\"$date\": 1632279264987}, 
\"transFinishTime\": \"123\", \"kafkaTime\": \"123\", \"tableName\": \"123\", 
\"offset\": \"123\", \"recordVersion\": \"123\", \"sign\": \"123\"}",
"source": {
"ts_ms": 0,
"snapshot": "true"
},
"ns": {
"db": "amp_test",
"coll": "TopTransOrder"
},
"to": null,
"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
"updateDescription": null,
"clusterTime": null,
"txnNumber": null,
"lsid": null
}




[2]

Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 Xuyang
Hi, 请问你的需求是 “debezium数据”- flink -“canal ”么? 
如果是这样的话,可以用UDF[1]来尝试下。[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
在 2022-08-21 10:49:29,"casel.chen"  写道:
>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal 
>json格式输出呢?有没有例子或关键代码展示?谢谢!


退订

2022-08-22 文章 Sijun Yang