??????????????????????row????????????????????event 
time????????????????????????????????????


create table my_source (
  database varchar,
  maxwell_ts bigint,
  table varchar,
&nbsp; data row<
&nbsp;&nbsp;&nbsp; transaction_sn varchar,
&nbsp;&nbsp;&nbsp; parent_id int,
&nbsp;&nbsp;&nbsp; user_id int,
&nbsp;&nbsp;&nbsp; amount int,
&nbsp;&nbsp;&nbsp; reference_id varchar,
&nbsp;&nbsp;&nbsp; status int,
&nbsp;&nbsp;&nbsp; transaction_type int,
&nbsp;&nbsp;&nbsp; merchant_id int,
&nbsp;&nbsp;&nbsp; update_time int,
&nbsp;&nbsp;&nbsp; create_time int
&nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)),&nbsp; // 
????????????
&nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
&nbsp; &gt;
) with (
&nbsp;&nbsp;&nbsp; ...
)


????????????


------------------&nbsp;????????&nbsp;------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??26??(??????) ????9:55
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: Flink SQL ???? nested Json ????



Hi??

????????????????????DDL??????source??format??????????????????????????DDL 
??????????????????
create table my_source (
&nbsp; database varchar,
&nbsp; maxwell_ts bigint,
&nbsp; table varchar,
&nbsp; data row<
&nbsp;&nbsp;&nbsp; transaction_sn varchar,
&nbsp;&nbsp;&nbsp; parent_id int,
&nbsp;&nbsp;&nbsp; user_id int,
&nbsp;&nbsp;&nbsp; amount int,
&nbsp;&nbsp;&nbsp; reference_id varchar,
&nbsp;&nbsp;&nbsp; status int,
&nbsp;&nbsp;&nbsp; transaction_type int,
&nbsp;&nbsp;&nbsp; merchant_id int,
&nbsp;&nbsp;&nbsp; update_time int,
&nbsp;&nbsp;&nbsp; create_time int
&nbsp; &gt;
) with (
&nbsp;&nbsp;&nbsp; ...
)

macia kk <[email protected]&gt; ??2020??5??26?????? ????9:36??????

&gt; Flink version: 1.10
&gt;
&gt; Json:
&gt;
&gt; {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
&gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_sn":"8888",
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "update_time":1590416550,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "create_time":1590416550
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }}
&gt;
&gt;
&gt; ????????????????????json???????? jsonSchema ??????Sechame
&gt;
&gt;
&gt; macia kk <[email protected]&gt; ??2020??5??26?????? ????9:34??????
&gt;
&gt; &gt; Flink version: 1.10
&gt; &gt;
&gt; &gt; Json:
&gt; &gt; ```j
&gt; &gt; {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"transaction_sn":"8888",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"update_time":1590416550,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"create_time":1590416550
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt; }
&gt; &gt; ```
&gt; &gt;
&gt; &gt; ????????????????????json???????? jsonSchema ??????Sechame
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Leonard Xu <[email protected]&gt; ??2020??5??26?????? ????8:58??????
&gt; &gt;
&gt; &gt;&gt; Hi, kk
&gt; &gt;&gt;
&gt; &gt;&gt; 
??????flink????????????1.10????????????format????????????????json??????????????????
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt; &gt;&gt; ??????
&gt; &gt;&gt; Leonard Xu
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt; &gt;&gt; &gt; ?? 2020??5??26????01:26??macia kk <[email protected]&gt; 
??????
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; ????????????????????????
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; ??????????????????????????????????Json
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; Error
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; Caused by: org.apache.flink.table.api.ValidationException: 
SQL
&gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4, column 
31: Column
&gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt; &gt;&gt;
&gt; 
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at 
java.lang.reflect.Method.invoke(Method.java:498)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
&gt; &gt;&gt;
&gt; 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt; ????Json ?????? format ?? schema ??????
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema(
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; """{type: 
'object',
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp; properties: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: {
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object',
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
reference_id :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 type: 'string'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
},
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
transaction_type :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
},
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
merchant_id :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
},
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
create_time :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
},
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
status :{
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 type: 'integer'
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
}
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
|&nbsp;&nbsp; }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
| }
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
""".stripMargin.replaceAll("\n", " ")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("table", 
STRING())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("database", 
STRING())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("data", 
ROW(FIELD("reference_id",STRING()),
&gt; &gt;&gt; &gt; FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
&gt; &gt;&gt; &gt; FIELD("status",INT())))
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
//.field("event_time", BIGINT())
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; 
.from("maxwell_ts")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //.rowtime(new 
Rowtime()
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; 
//.timestampsFromField("ts" * 1000)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; 
.timestampsFromField("ts")
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp; 
.watermarksPeriodicBounded(60000)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //)
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;
&gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 | SELECT `table`, `database`
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`,
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`,
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`,
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`,
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status`
&gt; &gt;&gt; 
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 | FROM xxxx""".stripMargin)
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt;


-- 

Best,
Benchao Li

回复