Hi, *Event Time Window: 15s* My currentWatermark for Event Time processing is not increasing fast enough to go past the window maxTimestamp. I have reduced *bound* used for watermark calculation to just *10 ms*. I have increased the parallelInput to process input from Kinesis in parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer) .setParallelism(2); For FlinkKinesisConsumer, I added a property from flink-1.8.0, kinesisConsumerConfig.setProperty(ConsumerConfigConstants. *SHARD_IDLE_INTERVAL_MILLIS*, 25);//this didn't seem to help
//in *EventTimeTrigger*.java: if (window.maxTimestamp() <= ctx.getCurrentWatermark()) Trigger.FIRE; My event producer to Kinesis is producing at a delay of 2500 ms for each record.(business requirement). What else can I do to consume data from Kinesis faster and cross the threshold for currentWatermark to increase beyond the window.maxTimestamp faster ? *MonitoringTSWAssigner* code: public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> { private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs private long maxTimestamp = Long.MIN_VALUE; public MonitoringTSWAssigner() { } public MonitoringTSWAssigner(long bound) { this.bound = bound; } public long extractTimestamp(Map<String, Object> monitoring, long previousTS) { long extractedTS = getExtractedTS(monitoring); if (extractedTS > maxTimestamp) { maxTimestamp = extractedTS; } return extractedTS; //return System.currentTimeMillis(); } public long getExtractedTS(Map<String, Object> monitoring) { final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : ""; return Utils.getLongFromDateStr(eventTimestamp); } @Override public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) { long extractedTS = getExtractedTS(monitoring); long nextWatermark = extractedTimestamp - *bound*; return new Watermark(nextWatermark); } } TIA