And is it possible to share state across parallel instances with AssignerWithPunctuatedWatermarks?
Thanks! On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > the problem might be that your timestamp/watermark assigner is run in > parallel and that only one parallel instance of those operators emits the > watermark because only one of those parallel instances sees the element > with _3 == 9000. For the watermark to advance at an operator it needs to > advance in all upstream operations. > > Cheers, > Aljoscha > > On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <saiph.ka...@gmail.com> wrote: > >> Hi, >> >> I have a streaming (event time) application where I am receiving events >> with the same assigned timestamp. I receive 10000 events in total on a >> window of 5 minutes, but I emit water mark when 9000 elements have been >> received. This watermark is 6 minutes after the assigned timestamps. My >> question is: why the function that is associated with the window reads >> 10000 elements and not 9000? All elements that have a timestamp lower than >> the watermark should be ignored (1000), but it's not happening. >> >> Here is part of the code: >> « >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> val rawStream = env.socketTextStream("localhost", 4321) >> >> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, >> Int, Long)] { >> val timestamp = System.currentTimeMillis(); >> >> override def extractTimestamp(element: (String, Int, Long), >> previousElementTimestamp: Long): Long = >> timestamp >> >> override def checkAndGetNextWatermark(lastElement: (String, Int, >> Long), extractedTimestamp: Long): Watermark = { >> if(lastElement._3 == 9000) { >> val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6) >> new watermark.Watermark(ts) >> } else null >> } >> } >> >> val stream = rawStream.map(line => { >> val Array(p1, p2, p3) = line.split(" ") >> (p1, p2.toInt, p3.toLong) >> }) >> .assignTimestampsAndWatermarks(punctuatedAssigner) >> >> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function) >> » >> >> Thanks! >> >