Hi, 关于 Json 的解析,当你的 Json 里面的一个字段一个镶嵌类型的话,可以将其定义为一个 row,row 里面还可以定义 row 字段。
注意 row 里面的字段名称要和原始json 里面的字段一致。 Best, LakeShen claylin <[email protected]> 于2020年5月26日周二 上午10:17写道: > 嗯 谢谢 我试下看下 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[email protected]>; > 发送时间: 2020年5月26日(星期二) 上午10:09 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: Flink SQL 嵌套 nested Json 解析 > > > > 嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。 > > claylin <[email protected]> 于2020年5月26日周二 上午10:07写道: > > > 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗 > > > > > > create table my_source ( > > &nbsp; database varchar, > > &nbsp; maxwell_ts bigint, > > &nbsp; table varchar, > > &nbsp; data row< > > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > > &nbsp;&nbsp;&nbsp; parent_id int, > > &nbsp;&nbsp;&nbsp; user_id int, > > &nbsp;&nbsp;&nbsp; amount int, > > &nbsp;&nbsp;&nbsp; reference_id varchar, > > &nbsp;&nbsp;&nbsp; status int, > > &nbsp;&nbsp;&nbsp; transaction_type int, > > &nbsp;&nbsp;&nbsp; merchant_id int, > > &nbsp;&nbsp;&nbsp; update_time int, > > &nbsp;&nbsp;&nbsp; create_time int > > &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS > > TIMESTAMP(3)),&nbsp; // 定义事件时间 > > &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > > &nbsp; &gt; > > ) with ( > > &nbsp;&nbsp;&nbsp; ... > > ) > > > > > > 这样可以行吗 > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Benchao Li"<[email protected]&gt;; > > 发送时间:&nbsp;2020年5月26日(星期二) 上午9:55 > > 收件人:&nbsp;"user-zh"<[email protected]&gt;; > > > > 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析 > > > > > > > > Hi, > > > > 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: > > create table my_source ( > > &nbsp; database varchar, > > &nbsp; maxwell_ts bigint, > > &nbsp; table varchar, > > &nbsp; data row< > > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > > &nbsp;&nbsp;&nbsp; parent_id int, > > &nbsp;&nbsp;&nbsp; user_id int, > > &nbsp;&nbsp;&nbsp; amount int, > > &nbsp;&nbsp;&nbsp; reference_id varchar, > > &nbsp;&nbsp;&nbsp; status int, > > &nbsp;&nbsp;&nbsp; transaction_type int, > > &nbsp;&nbsp;&nbsp; merchant_id int, > > &nbsp;&nbsp;&nbsp; update_time int, > > &nbsp;&nbsp;&nbsp; create_time int > > &nbsp; &gt; > > ) with ( > > &nbsp;&nbsp;&nbsp; ... > > ) > > > > macia kk <[email protected]&gt; 于2020年5月26日周二 上午9:36写道: > > > > &gt; Flink version: 1.10 > > &gt; > > &gt; Json: > > &gt; > > &gt; { > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "maxwell_ts":1590416550358000, > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "table":"transaction_tab", > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_sn":"8888", > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "parent_id":0, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "user_id":333, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "amount":555, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "reference_id":"666", > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "status":3, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_type":3, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "merchant_id":2, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "update_time":1590416550, > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "create_time":1590416550 > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }} > > &gt; > > &gt; > > &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > &gt; > > &gt; > > &gt; macia kk <[email protected]&gt; 于2020年5月26日周二 上午9:34写道: > > &gt; > > &gt; &gt; Flink version: 1.10 > > &gt; &gt; > > &gt; &gt; Json: > > &gt; &gt; ```j > > &gt; &gt; { > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "database":"main_db", > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "maxwell_ts":1590416550358000, > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; > "table":"transaction_tab", > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_sn":"8888", > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "parent_id":0, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "user_id":333, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "amount":555, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "reference_id":"666", > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "status":3, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "transaction_type":3, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "merchant_id":2, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "update_time":1590416550, > > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > "create_time":1590416550 > > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; } > > &gt; &gt; } > > &gt; &gt; ``` > > &gt; &gt; > > &gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > &gt; &gt; > > &gt; &gt; > > &gt; &gt; > > &gt; &gt; Leonard Xu <[email protected]&gt; > 于2020年5月26日周二 上午8:58写道: > > &gt; &gt; > > &gt; &gt;&gt; Hi, kk > > &gt; &gt;&gt; > > &gt; &gt;&gt; > 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; &gt;&gt; 祝好, > > &gt; &gt;&gt; Leonard Xu > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk < > [email protected]&gt; 写道: > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢 > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; Error > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; Caused by: > > org.apache.flink.table.api.ValidationException: SQL > > &gt; &gt;&gt; &gt; validation failed. From line 4, > column 9 to line 4, > > column 31: Column > > &gt; &gt;&gt; &gt; 'data.transaction_type' not found > in any table > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > &gt; &gt;&gt; > > &gt; > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > java.lang.reflect.Method.invoke(Method.java:498) > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > > &gt; &gt;&gt; > > &gt; > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下: > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new > Json() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .jsonSchema( > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > """{type: 'object', > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp; > > properties: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: { > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'object', > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > properties :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > reference_id :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'string' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > transaction_type :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > merchant_id :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > create_time :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > }, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > status :{ > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > type: 'integer' > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > |&nbsp;&nbsp; } > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | } > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > """.stripMargin.replaceAll("\n", " ") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > ) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new > Schema() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("table", STRING()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("database", STRING()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field("data", ROW(FIELD("reference_id",STRING()), > > &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), > > FIELD("merchant_id",INT()), > > &gt; &gt;&gt; &gt; FIELD("status",INT()))) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > //.field("event_time", BIGINT()) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .from("maxwell_ts") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > //.rowtime(new Rowtime() > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > //.timestampsFromField("ts" * 1000) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .timestampsFromField("ts") > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //&nbsp; > > .watermarksPeriodicBounded(60000) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //) > > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt; > > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; > bsTableEnv.sqlUpdate("""INSERT INTO > > yyyyy > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > | SELECT `table`, `database` > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.reference_id`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.transaction_type`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.merchant_id`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.create_time`, > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > `data.status` > > &gt; &gt;&gt; > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > | FROM xxxx""".stripMargin) > > &gt; &gt;&gt; > > &gt; &gt;&gt; > > &gt; > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benchao Li
