Hello,

I have an application which has two different streams of data, one
represents a set of events and the other a set of rules that need to be
matched against the events. In order to do this I use a coFlatMapOperator.
The problem is that if I assign the timestamps and watermarks after the
streams have been connected everything works fine but if I do it before, I
get a negative *currentwatermark* at the window and the operations on
windows have no effect. What could be the problem?

If I assign *Before *the connect:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/negativeWatermark.png>
 

If I assign *After *the connect:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/normalWatermark.png>
 

*Main *Code:

/            DataStream<CSVEvent> sourceStream = environment
                    .addSource(new SampleDataGenerator(sourceData,
true)).name("Source").setParallelism(1)
                    .assignTimestampsAndWatermarks(new TimestampAssigner());
*// if I assign the timestamps here the watermak seen at the window is
negative and the operations are not applied*

            DataStream<String> rulesStream = environment
                    .socketTextStream(monitorAddress, monitorPort,
DELIMITER)
                    .name("Rules Stream")
                    .setParallelism(1);

            
            SplitStream<RBEvent> processedStream =
sourceStream.connect(rulesStream)
                    .flatMap(new
RProcessor(rulesPath)).name("RBProcessor").setParallelism(1)
                    //.assignTimestampsAndWatermarks(new
DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) *//
If I assign the watermarks here everything works fine*
                    .split(new Spliter());

  processedStream
                    .select(RuleOperations.WINDOW_AGGRATION)
                    .keyBy(new DynamicKeySelector())
                    .window(new DynamicSlidingWindowAssigner())
                    .apply(new AggregationOperation()).name("Aggregation
Operation").setParallelism(1)
                    .print().name("Windowed Rule Output").setParallelism(1);

(..omitted details..)/

*
Timestamps and watermarks* assigner:
/
public class TimestampAssigner implements
AssignerWithPeriodicWatermarks<CSVEvent> {

    private final long MAX_DELAY = 2000; // 2 seconds
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;

    @Override
    public long extractTimestamp(CSVEvent element, long
previousElementTimestamp) {
        long timestamp =
Long.parseLong(element.event.get(element.getTimeField()));
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the
out-of-orderness bound
        long potentialWM = currentMaxTimestamp - MAX_DELAY;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

}/

Regards,
Pedro Chaves





-----
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-Negative-currentWatermark-if-the-watermark-assignment-is-made-before-connecting-the-streams-tp10315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to