And is it possible to share state across parallel instances with
AssignerWithPunctuatedWatermarks?

Thanks!

On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> the problem might be that your timestamp/watermark assigner is run in
> parallel and that only one parallel instance of those operators emits the
> watermark because only one of those parallel instances sees the element
> with _3 == 9000. For the watermark to advance at an operator it needs to
> advance in all upstream operations.
>
> Cheers,
> Aljoscha
>
> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <saiph.ka...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a streaming (event time) application where I am receiving events
>> with the same assigned timestamp. I receive 10000 events in total on a
>> window of 5 minutes, but I emit water mark when 9000 elements have been
>> received. This watermark is 6 minutes after the assigned timestamps. My
>> question is: why the function that is associated with the window reads
>> 10000 elements and not 9000? All elements that have a timestamp lower than
>> the watermark should be ignored (1000), but it's not happening.
>>
>> Here is part of the code:
>> «
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> val rawStream = env.socketTextStream("localhost", 4321)
>>
>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>> Int, Long)] {
>>       val timestamp = System.currentTimeMillis();
>>
>>       override def extractTimestamp(element: (String, Int, Long),
>> previousElementTimestamp: Long): Long =
>>         timestamp
>>
>>       override def checkAndGetNextWatermark(lastElement: (String, Int,
>> Long), extractedTimestamp: Long): Watermark = {
>>         if(lastElement._3 == 9000) {
>>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>           new watermark.Watermark(ts)
>>         } else null
>>       }
>>     }
>>
>> val stream = rawStream.map(line => {
>>       val Array(p1, p2, p3) = line.split(" ")
>>       (p1, p2.toInt, p3.toLong)
>>     })
>>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>>
>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>> »
>>
>> Thanks!
>>
>

Reply via email to