意思是虚拟出来的列,如果后面计算要用,需要watermark一下,嗯嗯,这个情况测试了,是可以用的。





------------------ Original ------------------
From: Jingsong Li <[email protected]&gt;
Date: Thu,May 21,2020 9:22 PM
To: user-zh <[email protected]&gt;
Subject: Re: flink proctime error



Hi,

- proctime是虚拟的一个列。
- rowtime是有真实数据的列。

看起来你需要在sink_table里定义rowtime,比如像这样:
CREATE TABLE sink_table (
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR proctime AS proctime
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
)

Best,
Jingsong Lee

On Thu, May 21, 2020 at 9:17 PM Benchao Li <[email protected]&gt; wrote:

&gt; 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
&gt; (是不是笔误,应该是在source_table上做窗口计算?)
&gt;
&gt; 了不起的盖茨比 <[email protected]&gt; 于2020年5月21日周四 下午9:08写道:
&gt;
&gt; &gt; error:Window aggregate can only be defined over a time attribute 
column,
&gt; &gt; but TIMESTAMP(3) encountered.
&gt; &gt; 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
&gt; &gt; by时候会出现那个error?
&gt; &gt; CREATE TABLE source_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime as proctime()
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 
'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 
'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 
'skyeye-tcpflow',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.group.id' = 'testGroup',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 
'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 
'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; CREATE TABLE sink_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3)
&gt; &gt;
&gt; &gt;
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 
'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 
'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 
'ip_agg',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 
'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 
'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt; insert into sink_kafka select sip,proctime from source_kafka;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
&gt; &gt; sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Benchao Li
&gt; School of Electronics Engineering and Computer Science, Peking University
&gt; Tel:+86-15650713730
&gt; Email: [email protected]; [email protected]
&gt;


-- 
Best, Jingsong Lee

回复