Hi,
因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据
是从kafka实时流传输过来的,每一分钟滑动窗口计算一次
SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator1 =
env.addSource(stringFlinkKafkaConsumerBase)
.filter((String s) -> (s.split(",", -1).length == 34))
.flatMap(new RichFlatMapFunction<String, warningPojo>() {
.keyBy("src", "msg")
.timeWindow(Time.minutes(1))
.process(new ProcessWindowFunction<warningPojo,
Tuple2<warningPojo, String>, Tuple, TimeWindow>()
.setParallelism(1);
每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。
在 2020-07-09 13:09:32,"Yichao Yang" <[email protected]> 写道:
>Hi,
>
>
>根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
>并且可以看下使用到的是处理时间还是事件时间?
>如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
>
>
>Best,
>Yichao Yang
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人: "爱吃鱼"<[email protected]>;
>发送时间: 2020年7月9日(星期四) 中午11:37
>收件人: "user-zh"<[email protected]>;
>
>主题: flink时间窗口
>
>
>
>你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
>SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator =
>flatMap.keyBy(0,1)
>
> .timeWindow(Time.minutes(1))
>
> .process(new ProcessWindowFunction)
>当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。