Hi Flink users, I saw a strange behavior that data are missing in reduce() but apply() doesn't, and when using 1.0.3 we don't see this behavior, and we see this in 1.1.3. Losing data means we don't see any event in the keys assigned, not the count of events.
The code is as follows. DataStream<Map<String, Object>> streams = env.addSource(new FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties)) .name("kafka_topics") .rebalance() .flatMap(new Eventsmap(events)) .assignTimestampsAndWatermarks(new EventWatermark()); DataStream<Map<String, Object>> count = streams .keyBy(new CompoundJsonKeySelector()).timeWindow(Time.minutes(1)) .allowedLateness(Time.minutes(3)) // apply is ok // .apply(new WindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow>() { // @Override // public void apply(String s, TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<Map<String, Object>> collector) throws Exception { // Iterator<Map<String, Object>> it = iterable.iterator(); // collector.collect(it.next()); // } // } // ); // reduce() loses data .reduce(new ReduceFunction<Map<String, Object>>() { @Override public Map<String, Object> reduce(Map<String, Object> v1, Map<String, Object> v2) throws Exception { int newCount = Integer.parseInt(v1.get("count").toString()) + Integer.parseInt(v2.get("count").toString()); v2.put("count",newCount); return v2; } }); Best, Is there any suggestion that we can try to figure out the root cause? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/missing-data-in-window-reduce-while-apply-is-ok-tp9689.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.