我一开始想的是source表采用proctime as proctime() 这样有了一个列,然后这个时间赋值给sink表的一个timestamp(3)列,group时候直接就可以用了。
------------------ Original ------------------ From: Benchao Li <[email protected]> Date: Thu,May 21,2020 9:17 PM To: user-zh <[email protected]> Subject: Re: flink proctime error 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。 (是不是笔误,应该是在source_table上做窗口计算?) 了不起的盖茨比 <[email protected]> 于2020年5月21日周四 下午9:08写道: > error:Window aggregate can only be defined over a time attribute column, > but TIMESTAMP(3) encountered. > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > by时候会出现那个error? > CREATE TABLE source_table ( > sip VARCHAR, > proctime as proctime() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'skyeye-tcpflow', > 'connector.properties.group.id' = 'testGroup', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > > CREATE TABLE sink_table ( > ip VARCHAR, > proctime timestamp(3) > > > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'ip_agg', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > insert into sink_kafka select sip,proctime from source_kafka; > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
