Hello, I have an application which has two different streams of data, one represents a set of events and the other a set of rules that need to be matched against the events. In order to do this I use a coFlatMapOperator. The problem is that if I assign the timestamps and watermarks after the streams have been connected everything works fine but if I do it before, I get a negative *currentwatermark* at the window and the operations on windows have no effect. What could be the problem?
If I assign *Before *the connect: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/negativeWatermark.png> If I assign *After *the connect: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/normalWatermark.png> *Main *Code: / DataStream<CSVEvent> sourceStream = environment .addSource(new SampleDataGenerator(sourceData, true)).name("Source").setParallelism(1) .assignTimestampsAndWatermarks(new TimestampAssigner()); *// if I assign the timestamps here the watermak seen at the window is negative and the operations are not applied* DataStream<String> rulesStream = environment .socketTextStream(monitorAddress, monitorPort, DELIMITER) .name("Rules Stream") .setParallelism(1); SplitStream<RBEvent> processedStream = sourceStream.connect(rulesStream) .flatMap(new RProcessor(rulesPath)).name("RBProcessor").setParallelism(1) //.assignTimestampsAndWatermarks(new DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) *// If I assign the watermarks here everything works fine* .split(new Spliter()); processedStream .select(RuleOperations.WINDOW_AGGRATION) .keyBy(new DynamicKeySelector()) .window(new DynamicSlidingWindowAssigner()) .apply(new AggregationOperation()).name("Aggregation Operation").setParallelism(1) .print().name("Windowed Rule Output").setParallelism(1); (..omitted details..)/ * Timestamps and watermarks* assigner: / public class TimestampAssigner implements AssignerWithPeriodicWatermarks<CSVEvent> { private final long MAX_DELAY = 2000; // 2 seconds private long currentMaxTimestamp; private long lastEmittedWatermark = Long.MIN_VALUE; @Override public long extractTimestamp(CSVEvent element, long previousElementTimestamp) { long timestamp = Long.parseLong(element.event.get(element.getTimeField())); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound long potentialWM = currentMaxTimestamp - MAX_DELAY; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } }/ Regards, Pedro Chaves ----- Best Regards, Pedro Chaves -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-Negative-currentWatermark-if-the-watermark-assignment-is-made-before-connecting-the-streams-tp10315.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.