Pane info is supposed to be preserved across transforms. If the Fink runner
does not, than I believe that is a bug.
On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek wrote:
> I am using FileIO and I do observe the drop of pane info information on
> Flink runner too. It was mentioned in this thread:
> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>
> It is a result of different reshuffle expansion for optimisation reasons.
> However, I did not observe a data loss in my case. Windowing and watermark
> info should be preserved. Pane info is not, which brings a question how
> reliable pane info should be in terms of SDK and runner.
>
> If you do observe a data loss, it would be great to share a test case
> which replicates the problem.
>
> On Sun, May 10, 2020 at 8:03 AM Reuven Lax wrote:
>
>> Ah, I think I see the problem.
>>
>> It appears that for some reason, the Flink runner loses windowing
>> information when a Reshuffle is applied. I'm not entirely sure why, because
>> windowing information should be maintained across a Reshuffle.
>>
>> Reuven
>>
>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel wrote:
>>
>>>
>>> Hi,
>>>
>>> I have added some logs to the pipeline as following (you can find the
>>> log function in the Appendix):
>>>
>>> //STREAM + processing time.
>>> pipeline.apply(KafkaIO.read())
>>>.apply(...) //mappers, a window and a combine
>>>.apply(logBeforeWrite())
>>>
>>>.apply("WriteFiles",
>>> TextIO.writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>.getPerDestinationOutputFilenames()
>>>
>>>.apply(logAfterWrite())
>>>.apply("CombineFileNames", Combine.perKey(...))
>>>
>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>> FlinkRunner, both of them using a cluster.
>>> Below you can see the timing and pane information before/after (you can
>>> see traces in detail with window and timestamp information in the Appendix).
>>>
>>> DirectRunner:
>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [AfterWrite] timing=EARLY, pane=PaneInfo{isFirst=true,
>>> timing=EARLY, index=0}
>>>
>>> FlinkRunner:
>>> [Before Write] timing=ON_TIME,pane=PaneInfo{isFirst=true,
>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>> [AfterWrite] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>
>>> SparkRunner:
>>> [Before Write] timing=ON_TIME,pane=PaneInfo{isFirst=true,
>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>> [AfterWrite] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>
>>> It seems DirectRunner propagates the windowing information as expected.
>>> I am not sure if TextIO really propagates or it just emits a window
>>> pane, because the timing before TextIO is ON_TIME and after TextIO is EARLY.
>>> In any case using FlinkRunner and SparkRunner the timing and the pane
>>> are not set.
>>>
>>> I thought the problem was in GatherBundlesPerWindowFn, but now, after
>>> seeing that the DirectRunner filled windowing data... I am not sure.
>>>
>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>>
>>>
>>> Appendix
>>> ---
>>> Here you can see the log function and traces for different runners in
>>> detail.
>>>
>>> private SingleOutput logBefore() {
>>> return ParDo.of(new DoFn() {
>>> @ProcessElement
>>> public void processElement(ProcessContext context, BoundedWindow
>>> boundedWindow) {
>>> String value = context.element();
>>> log.info("[Before Write] Element=data window={},
>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>> boundedWindow,
>>> context.timestamp(),
>>> context.pane().getTiming(),
>>> context.pane().getIndex(),
>>> context.pane().isFirst(),
>>> context.pane().isLast(),
>>> context.pane()
>>> );
>>> context.output(context.element());
>>> }
>>> });
>>> }
>>>
>>> logAfter function shows the same information.
>>>
>>> Traces in details.
>>>
>>> DirectRunner (local):
>>> [Before Write] Element=data
>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After Write] Element=file
>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst
>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>>
>>>
>>> FlinkRunner (cluster):
>>> [Before Write] Element=data
>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>> timestamp=2020-05-09T15:13:59.999Z,