?? ???? ??????????
------------------ ???????? ------------------ ??????: "Benchao Li"<[email protected]>; ????????: 2020??5??26??(??????) ????10:09 ??????: "user-zh"<[email protected]>; ????: Re: Flink SQL ???? nested Json ???? ??????????DDL??????WATERMARK???????????????????????????????????????????????? claylin <[email protected]> ??2020??5??26?????? ????10:07?????? > ??????????????????????row????????????????????event time???????????????????????????????????? > > > create table my_source ( > &nbsp; database varchar, > &nbsp; maxwell_ts bigint, > &nbsp; table varchar, > &nbsp; data row< > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > &nbsp;&nbsp;&nbsp; parent_id int, > &nbsp;&nbsp;&nbsp; user_id int, > &nbsp;&nbsp;&nbsp; amount int, > &nbsp;&nbsp;&nbsp; reference_id varchar, > &nbsp;&nbsp;&nbsp; status int, > &nbsp;&nbsp;&nbsp; transaction_type int, > &nbsp;&nbsp;&nbsp; merchant_id int, > &nbsp;&nbsp;&nbsp; update_time int, > &nbsp;&nbsp;&nbsp; create_time int > &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS > TIMESTAMP(3)),&nbsp; // ???????????? > &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > &nbsp; &gt; > ) with ( > &nbsp;&nbsp;&nbsp; ... > ) > > > ???????????? > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"Benchao Li"<[email protected]&gt;; > ????????:&nbsp;2020??5??26??(??????) ????9:55 > ??????:&nbsp;"user-zh"<[email protected]&gt;; > > ????:&nbsp;Re: Flink SQL ???? nested Json ???? > > > > Hi?? > > ????????????????????DDL??????source??format??????????????????????????DDL ?????????????????? > create table my_source ( > &nbsp; database varchar, > &nbsp; maxwell_ts bigint, > &nbsp; table varchar, > &nbsp; data row< > &nbsp;&nbsp;&nbsp; transaction_sn varchar, > &nbsp;&nbsp;&nbsp; parent_id int, > &nbsp;&nbsp;&nbsp; user_id int, > &nbsp;&nbsp;&nbsp; amount int, > &nbsp;&nbsp;&nbsp; reference_id varchar, > &nbsp;&nbsp;&nbsp; status int, > &nbsp;&nbsp;&nbsp; transaction_type int, > &nbsp;&nbsp;&nbsp; merchant_id int, > &nbsp;&nbsp;&nbsp; update_time int, > &nbsp;&nbsp;&nbsp; create_time int > &nbsp; &gt; > ) with ( > &nbsp;&nbsp;&nbsp; ... > ) > > macia kk <[email protected]&gt; ??2020??5??26?????? ????9:36?????? > > &gt; Flink version: 1.10 > &gt; > &gt; Json: > &gt; > &gt; { > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000, > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab", > &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_sn":"8888", > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666", > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "update_time":1590416550, > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "create_time":1590416550 > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }} > &gt; > &gt; > &gt; ????????????????????json???????? jsonSchema ??????Sechame > &gt; > &gt; > &gt; macia kk <[email protected]&gt; ??2020??5??26?????? ????9:34?????? > &gt; > &gt; &gt; Flink version: 1.10 > &gt; &gt; > &gt; &gt; Json: > &gt; &gt; ```j > &gt; &gt; { > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{ > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_sn":"8888", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "reference_id":"666", > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "transaction_type":3, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "update_time":1590416550, > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > "create_time":1590416550 > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt; } > &gt; &gt; ``` > &gt; &gt; > &gt; &gt; ????????????????????json???????? jsonSchema ??????Sechame > &gt; &gt; > &gt; &gt; > &gt; &gt; > &gt; &gt; Leonard Xu <[email protected]&gt; ??2020??5??26?????? ????8:58?????? > &gt; &gt; > &gt; &gt;&gt; Hi, kk > &gt; &gt;&gt; > &gt; &gt;&gt; ??????flink????????????1.10????????????format????????????????json?????????????????? > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; &gt;&gt; ?????? > &gt; &gt;&gt; Leonard Xu > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; &gt;&gt; &gt; ?? 2020??5??26????01:26??macia kk <[email protected]&gt; ?????? > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; ???????????????????????? > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; ??????????????????????????????????Json > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; Error > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; Caused by: > org.apache.flink.table.api.ValidationException: SQL > &gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4, > column 31: Column > &gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > &gt; &gt;&gt; > &gt; > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > java.lang.reflect.Method.invoke(Method.java:498) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at > &gt; &gt;&gt; > &gt; > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; ????Json ?????? format ?? schema ?????? > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema( > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > """{type: 'object', > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp; > properties: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: { > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object', > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > reference_id :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'string' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > transaction_type :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > merchant_id :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > create_time :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > }, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > status :{ > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > type: 'integer' > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp; } > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | } > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > """.stripMargin.replaceAll("\n", " ") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("table", STRING()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("database", STRING()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field("data", ROW(FIELD("reference_id",STRING()), > &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), > FIELD("merchant_id",INT()), > &gt; &gt;&gt; &gt; FIELD("status",INT()))) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //.field("event_time", BIGINT()) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .from("maxwell_ts") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > //.rowtime(new Rowtime() > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > //.timestampsFromField("ts" * 1000) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .timestampsFromField("ts") > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; > .watermarksPeriodicBounded(60000) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //) > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt; > &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO > yyyyy > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | SELECT `table`, `database` > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`, > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status` > &gt; &gt;&gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > | FROM xxxx""".stripMargin) > &gt; &gt;&gt; > &gt; &gt;&gt; > &gt; > > > -- > > Best, > Benchao Li -- Best, Benchao Li
