Hi, Here is my issue with *Event Processing* with the *add() method of MGroupingWindowAggregate not being called* even though a new watermark is fired 1. *Ingest data from Kinesis (works fine)* 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json back) 3. I do *assign MonitoringTSWAssigner*(code below) to the source with bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each incoming record but the *windowStream.aggregate method doesn't seem to fire* and I *don't see the add() method of MGroupingWindowAggregatecalled *???? I *can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement* 4. I have tried with timeWindow of 1m and 15s
*Main* code: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); //Setup Kinesis Consumer Properties kinesisConsumerConfig = new Properties(); .. kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>( "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig); DataStream<Map<String, Object>> kinesisStream; RichSinkFunction<InfluxDBPoint> influxSink; DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer); kinesisStream = monitoringDataStreamSource .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound)); influxSink = pms.createInfluxMonitoringSink(....); ...... ...timeWindow = Time.seconds(*timeIntervalL*);//tried with timeIntervalL=15s, 1m KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); DataStream<InfluxDBPoint> enrichedMGStream = *windowStream.aggregate*(//*<===== never reaches here ?????* *new MGroupingWindowAggregate(interval)*, new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) .map(new MonitoringGroupingToInfluxDBPoint(rule)); enrichedMGStream.addSink(influxSink); env.execute("Aggregation of Map data"); *MonitoringTSWAssigner* code: public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> { private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs private long maxTimestamp = Long.MIN_VALUE; public MonitoringTSWAssigner() { } public MonitoringTSWAssigner(long bound) { this.bound = bound; } public long extractTimestamp(Map<String, Object> monitoring, long previousTS) { long extractedTS = getExtractedTS(monitoring); if (extractedTS > maxTimestamp) { maxTimestamp = extractedTS; } return extractedTS;//return System.currentTimeMillis(); } public long getExtractedTS(Map<String, Object> monitoring) { final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : ""; return Utils.getLongFromDateStr(eventTimestamp); } @Override public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) { long extractedTS = getExtractedTS(monitoring); long nextWatermark = maxTimestamp - bound; return new Watermark(nextWatermark); } } *MGroupingWindowAggregate*: public class MGroupingWindowAggregate implements *AggregateFunction*<Map<String, Object>, Map<String, Object>, Map<String, Object>> { private final String interval; public MGroupingWindowAggregate(String interval) { this.interval = interval; } public Map<String, Object> createAccumulator() { return new ConcurrentHashMap<>(); } public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) { ..... } ..... } TIA,