????????????????????????????????sql????????????????????????????????????????watermark????????????????????????????????flink????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??17??(??????) ????8:33
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: sql ????josn?????????? ????????????



??????????????????????????????????????????????????

claylin <[email protected]&gt; ??2020??7??17?????? ????8:28??????

&gt; hi all????????????????json????????????????????(ts AS 
CAST(FROM_UNIXTIME(hiido_time) AS
&gt; TIMESTAMP(3)),????????)??????????????????
&gt; create table hiido_push_sdk_mq (
&gt; datas&amp;nbsp; &amp;nbsp;ARRAY<ROW<`from` string,hdid string,event
&gt; string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS
&gt; TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE&amp;gt;&amp;gt;
&gt; ) with (
&gt; 'connector' = 'kafka',
&gt; 'topic' = 'hiido_pushsdk_event',
&gt; 'properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,
&gt; kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103',
&gt; 'properties.group.id' = 'push_click_sql_version_consumer',
&gt; 'scan.startup.mode' = 'latest-offset',
&gt; 'format.type' = 'json');
&gt;
&gt;
&gt;
&gt;
&gt; ??????????
&gt; [ERROR] 2020-07-17 20:17:50,640(562284338) --&amp;gt; 
[http-nio-8080-exec-10]
&gt; 
com.yy.push.flink.sql.gateway.sql.parse.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:77):
&gt; parseBySqlParser, parse:
&gt; com.yy.push.flink.sql.gateway.context.JobContext$1@5d5f32d1, stmt: create
&gt; table hiido_push_sdk_mq (&amp;nbsp; &amp;nbsp; datas&amp;nbsp; 
&amp;nbsp;ARRAY<ROW<`from`
&gt; string,hdid string,event string,hiido_time bigint,ts AS
&gt; CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts -
&gt; INTERVAL '5' MINUTE&amp;gt;&amp;gt;) with ('connector' = 'kafka','topic' =
&gt; 'hiido_pushsdk_event','properties.bootstrap.servers' = '
&gt; kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,
&gt; kafkafs002-core003.yy.com:8103','properties.group.id' =
&gt; 'push_click_sql_version_consumer','scan.startup.mode' =
&gt; 'latest-offset','format.type' = 'json'), error info: SQL parse failed.
&gt; Encountered "AS" at line 1, column 115.
&gt; Was expecting one of:
&gt; &amp;nbsp; &amp;nbsp; "ROW" ...
&gt; &amp;nbsp; &amp;nbsp; <BRACKET_QUOTED_IDENTIFIER&amp;gt; ...
&gt; &amp;nbsp; &amp;nbsp; <QUOTED_IDENTIFIER&amp;gt; ...
&gt; &amp;nbsp; &amp;nbsp; <BACK_QUOTED_IDENTIFIER&amp;gt; ...
&gt; &amp;nbsp; &amp;nbsp; <IDENTIFIER&amp;gt; ...
&gt; &amp;nbsp; &amp;nbsp; <UNICODE_QUOTED_IDENTIFIER&amp;gt; ...
&gt; &amp;nbsp; &amp;nbsp; "STRING" ...
&gt; &amp;nbsp; &amp;nbsp; "BYTES" ...
&gt; &amp;nbsp; &amp;nbsp; "ARRAY" ...
&gt; &amp;nbsp; &amp;nbsp; "MULTISET" ...
&gt; &amp;nbsp; &amp;nbsp; "RAW" ...
&gt; &amp;nbsp; &amp;nbsp; "BOOLEAN" ...
&gt; &amp;nbsp; &amp;nbsp; "INTEGER" ...
&gt; &amp;nbsp; &amp;nbsp; "INT" ...
&gt; &amp;nbsp; &amp;nbsp; "TINYINT" ...
&gt; &amp;nbsp; &amp;nbsp; "SMALLINT" ...
&gt; &amp;nbsp; &amp;nbsp; "BIGINT" ...
&gt; &amp;nbsp; &amp;nbsp; "REAL" ...
&gt; &amp;nbsp; &amp;nbsp; "DOUBLE" ...
&gt; &amp;nbsp; &amp;nbsp; "FLOAT" ...
&gt; &amp;nbsp; &amp;nbsp; "BINARY" ...
&gt; &amp;nbsp; &amp;nbsp; "VARBINARY" ...
&gt; &amp;nbsp; &amp;nbsp; "DECIMAL" ...
&gt; &amp;nbsp; &amp;nbsp; "DEC" ...
&gt; &amp;nbsp; &amp;nbsp; "NUMERIC" ...
&gt; &amp;nbsp; &amp;nbsp; "ANY" ...
&gt; &amp;nbsp; &amp;nbsp; "CHARACTER" ...
&gt; &amp;nbsp; &amp;nbsp; "CHAR" ...
&gt; &amp;nbsp; &amp;nbsp; "VARCHAR" ...
&gt; &amp;nbsp; &amp;nbsp; "DATE" ...
&gt; &amp;nbsp; &amp;nbsp; "TIME" ...
&gt; &amp;nbsp; &amp;nbsp; "TIMESTAMP" ...



-- 

Best,
Benchao Li

回复