我一开始想的是source表采用proctime as proctime() 
这样有了一个列,然后这个时间赋值给sink表的一个timestamp(3)列,group时候直接就可以用了。





------------------ Original ------------------
From: Benchao Li <[email protected]&gt;
Date: Thu,May 21,2020 9:17 PM
To: user-zh <[email protected]&gt;
Subject: Re: flink proctime error



看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
(是不是笔误,应该是在source_table上做窗口计算?)

了不起的盖茨比 <[email protected]&gt; 于2020年5月21日周四 下午9:08写道:

&gt; error:Window aggregate can only be defined over a time attribute column,
&gt; but TIMESTAMP(3) encountered.
&gt; 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
&gt; by时候会出现那个error?
&gt; CREATE TABLE source_table (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sip VARCHAR,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime as proctime()
&gt; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 
'universal',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 
'latest-offset',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 
'skyeye-tcpflow',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.group.id' = 'testGroup',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 
'true'
&gt; );
&gt;
&gt;
&gt; CREATE TABLE sink_table (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3)
&gt;
&gt;
&gt; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 
'universal',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 
'latest-offset',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 
'ip_agg',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 
'true'
&gt; );
&gt;
&gt; insert into sink_kafka select sip,proctime from source_kafka;
&gt;
&gt;
&gt;
&gt; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
&gt; sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复