Hi,
- proctime是虚拟的一个列。
- rowtime是有真实数据的列。
看起来你需要在sink_table里定义rowtime,比如像这样:
CREATE TABLE sink_table (
ip VARCHAR,
proctime timestamp(3),
WATERMARK FOR proctime AS proctime
....
)
Best,
Jingsong Lee
On Thu, May 21, 2020 at 9:17 PM Benchao Li <[email protected]> wrote:
> 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
> (是不是笔误,应该是在source_table上做窗口计算?)
>
> 了不起的盖茨比 <[email protected]> 于2020年5月21日周四 下午9:08写道:
>
> > error:Window aggregate can only be defined over a time attribute column,
> > but TIMESTAMP(3) encountered.
> > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
> > by时候会出现那个error?
> > CREATE TABLE source_table (
> > sip VARCHAR,
> > proctime as proctime()
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.startup-mode' = 'latest-offset',
> > 'connector.topic' = 'skyeye-tcpflow',
> > 'connector.properties.group.id' = 'testGroup',
> > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
> > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
> > 'update-mode' = 'append',
> > 'format.type' = 'json',
> > 'format.derive-schema' = 'true'
> > );
> >
> >
> > CREATE TABLE sink_table (
> > ip VARCHAR,
> > proctime timestamp(3)
> >
> >
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.startup-mode' = 'latest-offset',
> > 'connector.topic' = 'ip_agg',
> > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
> > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
> > 'update-mode' = 'append',
> > 'format.type' = 'json',
> > 'format.derive-schema' = 'true'
> > );
> >
> > insert into sink_kafka select sip,proctime from source_kafka;
> >
> >
> >
> > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
> > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [email protected]; [email protected]
>
--
Best, Jingsong Lee