Hi, the single-element-windows to me indicate that these originate from elements that arrived at the window operator after the watermark. In the current version of Flink these elements will be emitted as a single-element window. You can avoid this by writing a custom EventTimeTrigger that does not fire on late elements. In Flink version 1.1 we also introduce a setting that allows to specify an allowed lateness after which elements are dropped.
Cheers, Aljoscha On Fri, 29 Jul 2016 at 17:30 Sendoh <unicorn.bana...@gmail.com> wrote: > Hi Flink users, > > We have an issue that TimeWindowAll() doesn't assign properly. The sum > should be in the same window but is generated in separate windows. > > For example in the following, window 832348384 has window start time > 2016-07-20T05:57:00.000 with counts 36, and there is another window > 832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They > should be aggregated in the same window 832348384 with counts 37. > > ...// hashCode in winodw, sum of events in the window, window start time > {"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"} > {"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"} > {"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"} > {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} > {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} > {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} > ... > > Example code is as follows: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > Config.bootstrapServers); > properties.setProperty("group.id", > parameter.getRequired("groupId")); > properties.setProperty("auto.offset.reset", "earliest"); > > FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new > FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties); > > DataStream<JSONObject> streams = env.addSource(kafkaConsumer) > .assignTimestampsAndWatermarks(new > CorrelationWatermark()).rebalance(); > > DataStream<JSONObject> afterWindow = > streams.timeWindowAll(Time.minutes(1)) > .apply(new SumAllWindow()); > > > public static class SumAllWindow implements AllWindowFunction<JSONObject, > JSONObject, TimeWindow> { > > @Override > public void apply(TimeWindow timeWindow, Iterable<JSONObject> > values, > Collector<JSONObject> collector) throws Exception > { > > DateTime startTs = new DateTime(timeWindow.getStart()); > JSONObject jsonObject = new JSONObject(); > > int sum = 0; > for (JSONObject value : values){ > sum += 1; > } > > jsonObject.put("startDate", startTs.toString()); > jsonObject.put("count", sum); > jsonObject.put("hashCode", timeWindow.hashCode()); > collector.collect(jsonObject); > } > } > > > public class CorrelationWatermark implements > AssignerWithPeriodicWatermarks<JSONObject> { > private final long maxOutOfOrderness = 10000 * 1; > private long currentMaxTimestamp; > > @Override > public long extractTimestamp(JSONObject element, long > previousElementTimestamp) { > long timestamp = > DateTime.parse(element.get("occurredAt").toString(), > Config.timeFormatter).getMillis(); > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); > return timestamp; > } > > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentMaxTimestamp - maxOutOfOrderness); > } > } > > We have no problem with a smaller Kafka topic with Flink 1.0.3. Do we make > a > mistake somewhere? > Please let me know if any further information is required to resolve this > issue. > > Best, > > Sendoh > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-tp8201.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >