reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加 是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
测试输入如下: 1001,/home,1000 1002,/home,2000 输出如下: input> test.Event(user=1001, page=/home, ts=1000) input> test.Event(user=1002, page=/home, ts=2000) test.WordCount(word=/home, count=2) test.WordCount(word=/home, count=3) 代码如下: import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.io.Serializable; import java.time.Duration; public class test { public static void main(String[] args) { //准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从端口读数据 SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 55555).map( value->{ String[] strings = value.split(","); return new Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) ); } ).assignTimestampsAndWatermarks( //增加水位线策略 WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event, l) -> Event.getTs()) ); //检查输入流 ds1.print("input"); ds1.map(event -> new WordCount(event.getPage(), 1) ).keyBy(WordCount::getWord //按键分组 ).window( //TumblingEventTimeWindows.of(Time.seconds(10)) SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)) //size为10步长为5的滑动窗口 ).reduce( //先增量聚合.将多个数据处理为一个中间结果 (wordCount1, wordCount2) -> { Integer count = wordCount1.getCount(); wordCount1.setCount(count + 1); System.out.println(wordCount1); return wordCount1; } ); try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } @Data @AllArgsConstructor @NoArgsConstructor public static class Event { private String user; private String page; private Long ts; } @Data @AllArgsConstructor @NoArgsConstructor public static class WordCount implements Serializable { private String word; private Integer count; } }