大家好。

我用的tumbling window,
ds.keyBy(CandleView::getMarketCode)
                .timeWindow(Time.minutes(5L))
               
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                .aggregate(new OhlcAggregateFunction(), new
OhlcWindowFunction())
                .addSink(new PgSink(jdbcUrl, userName, password,
candle_table_5m))
                .name(candle_table_5m);

Sliding Window:

ds.keyBy(CandleView::getMarketCode)
                .timeWindow(Time.hours(24L), Time.seconds(2))
                .aggregate(new OhlcAggregateFunction(), new
TickerWindowFunction())
                .addSink(new PgSink(jdbcUrl, userName, password,
candle_table_24h))
                .name(candle_table_24h);

一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
基于的是同一个dataStream

有没有什么建议,或者哪个地方用错了? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复