If you still need help diagnosing the cause of the misbehavior, please
share more of the code with us.

On Wed, Aug 21, 2019 at 6:24 PM Eric Isling <out.code....@gmail.com> wrote:
>
> I should add that the behaviour persists, even when I force parallelism to 1.
>
> On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <out.code....@gmail.com> wrote:
>>
>> Dear list-members,
>>
>> I have a question regarding window-firing and element accumulation for a 
>> slidindingwindow on a DataStream (Flink 1.8.1-2.12).
>>
>> My DataStream is derived from a custom SourceFunction, which emits 
>> stirng-sequences of WINDOW size, in a deterministic sequence.
>> The aim is to crete sliding windows over the keyedstream for processing on 
>> the accumulated strings, based on EventTime.
>> To assign EventTime and Watermark, I attech an 
>> AssignerWithPeriodicWaterMarks, to the stream.
>> The sliding window is processed with a custom ProcessWindowFunction.
>>
>> env.setStreamTimeCharacteristic(EventTime)
>> val seqStream = env.addSource(Seqstream)
>>     .assignTimestampsAndWatermarks(SeqTimeStampExtractor())
>>     .keyBy(getEventtimeKey)
>>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), 
>> Time.milliseconds(slideSize)))
>>
>> val result = seqStream.process(ProcessSeqWindow(target1))
>>
>> My AssignerWithPeriodicWaterMarks looks like this.
>> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
>>     var waterMark  = 9999L
>>     override fun extractTimestamp(element: FASTAstring, 
>> previousElementTimestamp: Long): Long {
>>         return element.f1
>>     }
>>
>>     override fun getCurrentWatermark(): Watermark? {
>>         waterMark += 1
>>         return Watermark(waterMark)
>>     }
>> }
>>
>> In other words, each element emitted by the source should have its own 
>> EvenTime, and the WaterMark should be emitted allowing no further events for 
>> that time.
>> Stepping through the stream in a debugger, indicates that EventTime / 
>> Watremarks are generated as would expect.
>>
>> My expectation is that ProcessSeqWindow.run() ought to be called with a 
>> number of elements proportional to the time window (e.g. 10 ms), over 
>> EventTime. However, what I observe is that run() is called multiple times 
>> with single elemnts, and in an arbitrary sequence with respect to EventTime.
>>
>> My question is whether this is likely to be caused by multiple 
>> trigger-events on each window, or are there other possible explainations? 
>> How can I debug the cause?
>>
>> Thanks,
>>
>> Eric

Reply via email to