?? ???? ??????????



------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??26??(??????) ????10:09
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: Flink SQL ???? nested Json ????



??????????DDL??????WATERMARK????????????????????????????????????????????????

claylin <[email protected]&gt; ??2020??5??26?????? ????10:07??????

&gt; ??????????????????????row????????????????????event 
time????????????????????????????????????
&gt;
&gt;
&gt; create table my_source (
&gt; &amp;nbsp; database varchar,
&gt; &amp;nbsp; maxwell_ts bigint,
&gt; &amp;nbsp; table varchar,
&gt; &amp;nbsp; data row<
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
&gt; &amp;nbsp; &amp;nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
&gt; TIMESTAMP(3)),&amp;nbsp; // ????????????
&gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
&gt; &amp;nbsp; &amp;gt;
&gt; ) with (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
&gt; )
&gt;
&gt;
&gt; ????????????
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"Benchao Li"<[email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??5??26??(??????) ????9:55
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: Flink SQL ???? nested Json ????
&gt;
&gt;
&gt;
&gt; Hi??
&gt;
&gt; ????????????????????DDL??????source??format??????????????????????????DDL 
??????????????????
&gt; create table my_source (
&gt; &amp;nbsp; database varchar,
&gt; &amp;nbsp; maxwell_ts bigint,
&gt; &amp;nbsp; table varchar,
&gt; &amp;nbsp; data row<
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_sn varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; parent_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; user_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; amount int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; reference_id varchar,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; status int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; transaction_type int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; merchant_id int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; update_time int,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; create_time int
&gt; &amp;nbsp; &amp;gt;
&gt; ) with (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ...
&gt; )
&gt;
&gt; macia kk <[email protected]&amp;gt; ??2020??5??26?????? ????9:36??????
&gt;
&gt; &amp;gt; Flink version: 1.10
&gt; &amp;gt;
&gt; &amp;gt; Json:
&gt; &amp;gt;
&gt; &amp;gt; {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "database":"main_db",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
"maxwell_ts":1590416550358000,
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "table":"transaction_tab",
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_sn":"8888",
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "parent_id":0,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "user_id":333,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "amount":555,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "reference_id":"666",
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "status":3,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "transaction_type":3,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "merchant_id":2,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "update_time":1590416550,
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "create_time":1590416550
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }}
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ????????????????????json???????? jsonSchema ??????Sechame
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; macia kk <[email protected]&amp;gt; ??2020??5??26?????? 
????9:34??????
&gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Flink version: 1.10
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Json:
&gt; &amp;gt; &amp;gt; ```j
&gt; &amp;gt; &amp;gt; {
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
"database":"main_db",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
"maxwell_ts":1590416550358000,
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
"table":"transaction_tab",
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "data":{
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_sn":"8888",
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "parent_id":0,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "user_id":333,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "amount":555,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "reference_id":"666",
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "status":3,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "transaction_type":3,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "merchant_id":2,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "update_time":1590416550,
&gt; &amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; "create_time":1590416550
&gt; &amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt; }
&gt; &amp;gt; &amp;gt; ```
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; ????????????????????json???????? jsonSchema ??????Sechame
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Leonard Xu <[email protected]&amp;gt; 
??2020??5??26?????? ????8:58??????
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; Hi, kk
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; 
??????flink????????????1.10????????????format????????????????json??????????????????
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; ??????
&gt; &amp;gt; &amp;gt;&amp;gt; Leonard Xu
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; ?? 2020??5??26????01:26??macia kk 
<[email protected]&amp;gt; ??????
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; ????????????????????????
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; ??????????????????????????????????Json
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Error
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; Caused by:
&gt; org.apache.flink.table.api.ValidationException: SQL
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; validation failed. From line 4, column 
9 to line 4,
&gt; column 31: Column
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; 'data.transaction_type' not found in 
any table
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; ????Json ?????? format ?? schema ??????
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withFormat(new 
Json()
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
.jsonSchema(
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; """{type: 'object',
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 |&amp;nbsp;
&gt; properties: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; database: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; table: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; maxwell_ts: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; data: {
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 type: 'object',
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 properties :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; reference_id :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'string'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; transaction_type :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; merchant_id :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; create_time :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; },
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; status :{
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; type: 'integer'
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
|&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 | }
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; """.stripMargin.replaceAll("\n", " ")
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 )
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .withSchema(new 
Schema()
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("table", STRING())
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("database", STRING())
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field("data", ROW(FIELD("reference_id",STRING()),
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("transaction_type",INT()),
&gt; FIELD("merchant_id",INT()),
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt; FIELD("status",INT())))
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; //.field("event_time", BIGINT())
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
//&amp;nbsp;
&gt; .from("maxwell_ts")
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; //.rowtime(new Rowtime()
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
//&amp;nbsp;
&gt; //.timestampsFromField("ts" * 1000)
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
//&amp;nbsp;
&gt; .timestampsFromField("ts")
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
//&amp;nbsp;
&gt; .watermarksPeriodicBounded(60000)
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
//)
&gt; &amp;gt; &amp;gt;&amp;gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
bsTableEnv.sqlUpdate("""INSERT INTO
&gt; yyyyy
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; | SELECT `table`, `database`
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
`data.reference_id`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
`data.transaction_type`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
`data.merchant_id`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
`data.create_time`,
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; |&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
`data.status`
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; | FROM xxxx""".stripMargin)
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt; &amp;gt;&amp;gt;
&gt; &amp;gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li



-- 

Best,
Benchao Li

回复