Hi, 文档[1] 记录的是类似mysql (cancel json) -> kafka -> flink -> other db 的行为,主要还是侧重于flink 读canal format。
中间的转换需要自己实现下,可以在udf中通过open方法连一下mongodb拿一下,因为目前udf是感知不到catalog。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format -- Best! Xuyang 在 2022-08-23 08:55:44,"casel.chen" <casel_c...@126.com> 写道: >数据流图是 mongodb --> flink cdc --> kafka (canal json) >看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal >json格式的消息,中间的格式转换得自己实现是么? >但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢? > > >另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal >json数据[3]。这是已知的issue么? > > >CREATETABLE >mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers'); >CREATETABLE >kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092', >'format'='canal-json'); >INSERT INTO kafka_customers SELECT * FROM mongo_customers; > > >[1] >{ >"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": >true}", >"operationType": "insert", >"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, >\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": >\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", >\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", >\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", >\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": >\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", >\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": >\"666684552350\", \"transMajorCategory\": \"123\", >\"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", >\"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", >\"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": >\"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", >\"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", >\"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", >\"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": >123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, >\"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", >\"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", >\"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", >\"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", >\"recordVersion\": \"123\", \"sign\": \"123\"}", >"source": { >"ts_ms": 0, >"snapshot": "true" >}, >"ns": { >"db": "amp_test", >"coll": "TopTransOrder" >}, >"to": null, >"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}", >"updateDescription": null, >"clusterTime": null, >"txnNumber": null, >"lsid": null >} > > > > >[2] >{"data":[{"_id":"614a9b3769736f5fcc492613","id":null,"reqSeqId":"123","ordId":"1440510124339011584","outTransId":"123","merOrdId":"123","hfSeqId":"123","partyOrderId":"123","bankSeqId":"123","orgOrdId":"123","orgTermOrdId":"123","orgHuifuSeqId":"123","transDate":"20210913","productId":"app3","serviceId":"6767679","topAgentId":"123","belongAgentId":"123","chainsId":"123","huifuId":"66668455531","transMajorCategory":"123","consoleActualPayChannel":"123","consolePayType":"123","consolePreAuthFlag":"123","consoleSubsidyFlag":"123","consoleDcType":"123","consoleAcctDivFlag":"123","actualPayChannel":"123","payChannel":"123","transType":"123","payType":"123","dcType":"123","isAcctDiv":"123","isDelayAcct":"123","creditType":"123","devsId":"123","ordAmt":123.32,"feeAmt":123,"actOrdAmt":123,"actualRefAmt":123,"refAmt":123,"refFeeAmt":123,"subsidyAmt":123,"subsidyRefAmt":123,"payCardId":"123","feeRecType":"123","feeFlag":"123","transStat":"S","transFinishTime":"123","tableName":"123","offset":"123","recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,"accSplitBunch":null,"acctId":null,"fqFeeAmt":null,"payCardIdEnc":null,"goodsDesc":null,"remark":null,"synTtlDate":null,"outOrdId":null,"devType":null,"feeHuifuId":null,"feeAcctId":null,"orgTransDate":null,"orgOrdAmt":null,"orgCreateTime":null,"userType":null,"userId":null,"userIdExt":null,"settleAmt":null,"refCnt":null,"consoleCountSum":null,"topConsolePayType":null,"orgMerOrdId":null,"feeAllowanceFlag":null,"correctStat":null,"addedOrgFeeAmt":null,"discountFeeAmt":null,"acctFinishTime":null,"pospSeqId":null,"outOrderId":null,"cashTransId":null,"orgPayType":null,"orgPayChannel":null,"branch1HuifuId":null,"branch2HuifuId":null,"branch3HuifuId":null,"branch4HuifuId":null,"branch5HuifuId":null,"branchHuifuId":null,"level":null,"branchChannelId":null,"orgFeeAmt":null,"orgConsoleIsFq":null,"orgCreditType":null,"fqMerDiscountFlag":null,"payScene":null,"labels":null,"orgTransType":null,"orgFeeRecType":null,"orgFeeFlag":null,"orgDiscountFeeAmt":null,"merOperId":null,"operType":null,"batchId":null,"authNo":null,"refNum":null,"bankMerId":null,"bankMerName":null,"posMerId":null,"posMerName":null,"acqrInstId":null,"doubleExempt":null,"pnrDevId":null,"posTermId":null,"realPayType":null,"channelFinishTime":null,"transRefundBankId":null,"transRefundBankName":null,"orgRealPayType":null,"orgDevsId":null,"merPriv":null,"transRefundOutOrdId":null,"orgHfSeqId":null,"synMode":null,"cloudPay":null,"terminalReqDate":null,"terminalPayChannel":null,"huifuFstOrg":null,"huifuSecOrg":null,"huifuThdOrg":null,"huifuForOrg":null,"huifuSales":null,"partnerBd":null,"organizationId":null,"upperOrgId":null,"merOrg":null,"partnerInnerFstOrg":null,"partnerInnerSecOrg":null,"partnerInnerThdOrg":null,"partnerFstOrg":null,"partnerSecOrg":null,"partnerThdOrg":null,"collectMerFstOrg":null,"collectMerSecOrg":null,"collectMerThdOrg":null,"collectMerForOrg":null,"collectMerFivOrg":null,"collectMerSixOrg":null,"fullPath":null}],"type":"INSERT"} > > > > >[3] >https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format > > > > > >在 2022-08-22 22:57:04,"Xuyang" <xyzhong...@163.com> 写道: >>Hi, 请问你的需求是 “debezium数据”-> flink ->“canal ”么? >>如果是这样的话,可以用UDF[1]来尝试下。<br/><br/>[1] >>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ >>在 2022-08-21 10:49:29,"casel.chen" <casel_c...@126.com> 写道: >>>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka? >>>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal >>>json格式输出呢?有没有例子或关键代码展示?谢谢!