嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。

claylin <[email protected]> 于2020年5月26日周二 上午10:07写道:

> 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
>
>
> create table my_source (
> &nbsp; database varchar,
> &nbsp; maxwell_ts bigint,
> &nbsp; table varchar,
> &nbsp; data row<
> &nbsp;&nbsp;&nbsp; transaction_sn varchar,
> &nbsp;&nbsp;&nbsp; parent_id int,
> &nbsp;&nbsp;&nbsp; user_id int,
> &nbsp;&nbsp;&nbsp; amount int,
> &nbsp;&nbsp;&nbsp; reference_id varchar,
> &nbsp;&nbsp;&nbsp; status int,
> &nbsp;&nbsp;&nbsp; transaction_type int,
> &nbsp;&nbsp;&nbsp; merchant_id int,
> &nbsp;&nbsp;&nbsp; update_time int,
> &nbsp;&nbsp;&nbsp; create_time int
> &nbsp; &nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
> TIMESTAMP(3)),&nbsp; // 定义事件时间
> &nbsp; &nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
> &nbsp; &gt;
> ) with (
> &nbsp;&nbsp;&nbsp; ...
> )
>
>
> 这样可以行吗
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[email protected]&gt;;
> 发送时间:&nbsp;2020年5月26日(星期二) 上午9:55
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
>
> 主题:&nbsp;Re: Flink SQL 嵌套 nested Json 解析
>
>
>
> Hi,
>
> 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
> create table my_source (
> &nbsp; database varchar,
> &nbsp; maxwell_ts bigint,
> &nbsp; table varchar,
> &nbsp; data row<
> &nbsp;&nbsp;&nbsp; transaction_sn varchar,
> &nbsp;&nbsp;&nbsp; parent_id int,
> &nbsp;&nbsp;&nbsp; user_id int,
> &nbsp;&nbsp;&nbsp; amount int,
> &nbsp;&nbsp;&nbsp; reference_id varchar,
> &nbsp;&nbsp;&nbsp; status int,
> &nbsp;&nbsp;&nbsp; transaction_type int,
> &nbsp;&nbsp;&nbsp; merchant_id int,
> &nbsp;&nbsp;&nbsp; update_time int,
> &nbsp;&nbsp;&nbsp; create_time int
> &nbsp; &gt;
> ) with (
> &nbsp;&nbsp;&nbsp; ...
> )
>
> macia kk <[email protected]&gt; 于2020年5月26日周二 上午9:36写道:
>
> &gt; Flink version: 1.10
> &gt;
> &gt; Json:
> &gt;
> &gt; {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_sn":"8888",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "reference_id":"666",
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "transaction_type":3,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "update_time":1590416550,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "create_time":1590416550
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; }}
> &gt;
> &gt;
> &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt;
> &gt;
> &gt; macia kk <[email protected]&gt; 于2020年5月26日周二 上午9:34写道:
> &gt;
> &gt; &gt; Flink version: 1.10
> &gt; &gt;
> &gt; &gt; Json:
> &gt; &gt; ```j
> &gt; &gt; {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "database":"main_db",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "maxwell_ts":1590416550358000,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "table":"transaction_tab",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; "data":{
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_sn":"8888",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "parent_id":0,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user_id":333,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "amount":555,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "reference_id":"666",
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "status":3,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "transaction_type":3,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "merchant_id":2,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "update_time":1590416550,
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "create_time":1590416550
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt; }
> &gt; &gt; ```
> &gt; &gt;
> &gt; &gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Leonard Xu <[email protected]&gt; 于2020年5月26日周二 上午8:58写道:
> &gt; &gt;
> &gt; &gt;&gt; Hi, kk
> &gt; &gt;&gt;
> &gt; &gt;&gt; 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt; &gt;&gt; 祝好,
> &gt; &gt;&gt; Leonard Xu
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt; &gt;&gt; &gt; 在 2020年5月26日,01:26,macia kk <[email protected]&gt; 写道:
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 有哪位大佬帮我看下,谢谢
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 尝试了很久,还是无法解析嵌套结构的Json
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; Error
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; Caused by:
> org.apache.flink.table.api.ValidationException: SQL
> &gt; &gt;&gt; &gt; validation failed. From line 4, column 9 to line 4,
> column 31: Column
> &gt; &gt;&gt; &gt; 'data.transaction_type' not found in any table
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> &gt; &gt;&gt;
> &gt;
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;&gt;
> &gt;
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt; 嵌套Json 定义的 format 和 schema 如下:
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .jsonSchema(
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> """{type: 'object',
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp;
> properties: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; database: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; table: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxwell_ts: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; data: {
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type: 'object',
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> reference_id :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'string'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> transaction_type :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> merchant_id :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> create_time :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> },
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> status :{
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> type: 'integer'
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp; }
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; | }
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> """.stripMargin.replaceAll("\n", " ")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("table", STRING())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("database", STRING())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field("data", ROW(FIELD("reference_id",STRING()),
> &gt; &gt;&gt; &gt; FIELD("transaction_type",INT()),
> FIELD("merchant_id",INT()),
> &gt; &gt;&gt; &gt; FIELD("status",INT())))
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //.field("event_time", BIGINT())
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .from("maxwell_ts")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //.rowtime(new Rowtime()
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> //.timestampsFromField("ts" * 1000)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .timestampsFromField("ts")
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;
> .watermarksPeriodicBounded(60000)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //)
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;
> &gt; &gt;&gt; &gt;&nbsp;&nbsp;&nbsp; bsTableEnv.sqlUpdate("""INSERT INTO
> yyyyy
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> | SELECT `table`, `database`
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.reference_id`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.transaction_type`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.merchant_id`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.create_time`,
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `data.status`
> &gt; &gt;&gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> | FROM xxxx""".stripMargin)
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt;
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li

回复