reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复

测试输入如下:
1001,/home,1000
1002,/home,2000

输出如下:
input> test.Event(user=1001, page=/home, ts=1000)
input> test.Event(user=1002, page=/home, ts=2000)
test.WordCount(word=/home, count=2)
test.WordCount(word=/home, count=3)

代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.Serializable;
import java.time.Duration;

public class test {
    public static void main(String[] args) {
        //准备环境
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从端口读数据
        SingleOutputStreamOperator<Event> ds1 = 
env.socketTextStream("hadoop102", 55555).map(
                value->{
                    String[] strings = value.split(",");
                    return new 
Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
                }

        ).assignTimestampsAndWatermarks(
                //增加水位线策略
                
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
 l) -> Event.getTs())
        );
        //检查输入流
        ds1.print("input");


        ds1.map(event -> new WordCount(event.getPage(), 1)
        ).keyBy(WordCount::getWord
                //按键分组
        ).window(
                //TumblingEventTimeWindows.of(Time.seconds(10))
                SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
                //size为10步长为5的滑动窗口
        ).reduce(
                //先增量聚合.将多个数据处理为一个中间结果

                (wordCount1, wordCount2) -> {

                    Integer count = wordCount1.getCount();

                    wordCount1.setCount(count + 1);

                    System.out.println(wordCount1);

                    return wordCount1;
                }


        );

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Event {
        private String user;
        private String page;
        private Long ts;

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor

    public static class WordCount implements Serializable {
        private String word;
        private Integer count;

    }



}

回复