I have made some recent findings.

To answer your question, it does not happen when using a DirectRunner. So I
focused more on our FlinkRunner's config.

I found that adding this config alleviates the problem:
optionsWithFlinkOptions.setCheckpointingInterval(100)

This configuration was not set at all. If someone understands why this
configuration affects the `FileIO.continuous` duration, I'd love to know.

Please and thank you,
Marco.

On Mon, Sep 27, 2021 at 1:52 PM Chamikara Jayalath <[email protected]>
wrote:

> This does sound like a bug, more specifically regarding timer invocations
> of the Flink runner.
> Do you run into the same issue with other runners ?
>
> On Thu, Sep 23, 2021 at 6:01 PM Marco Costantini <
> [email protected]> wrote:
>
>> Hi team,
>> I'm experiencing, what I think is, a bug with FileIO. I've got a very
>> minimal pipeline which reproduces it:
>>
>> ```
>>     pipeline
>>  
>> .apply(FileIO.`match`.filepattern(String.format("%s/**/*",location)).continuously(Duration.standardSeconds(100),
>> Watch.Growth.never() ))
>>       .apply(FileIO.readMatches)
>>       .getPipeline
>>       .run
>>       .waitUntilFinish()
>> ```
>>
>> What happens is that, in my logging, it polls far more often than
>> requested. The logs look like this:
>>
>> ```
>> 2021-09-23 23:08:09,570 INFO  org.apache.beam.sdk.transforms.Watch
>>                   [] - s3://bucket/raw-events//**/* - will resume polling
>> in 100000 ms.
>>                                                    2021-09-23 23:08:09,633
>> INFO  org.apache.beam.sdk.transforms.Watch                         [] -
>> s3://bucket/raw-events//**/* -
>>  current round of polling took 62 ms and returned 0 results, of which 0
>> were new.
>> 2021-09-23 23:08:09,634 INFO  org.apache.beam.sdk.transforms.Watch
>>                 [] - s3://bucket/raw-events//**/* - will resume polling in
>> 100000 ms.
>>                                                  2021-09-23 23:08:09,701
>> INFO  org.apache.beam.sdk.transforms.Watch                         [] -
>> s3://bucket/raw-events//**/* -
>>  current round of polling took 66 ms and returned 0 results, of which 0
>> were new.
>> ```
>>
>> Rather than once every 100 seconds, it polls many times PER second.
>> Regardless of what I change the duration too, it seems to poll at this same
>> frequency.
>>
>> Some technical aspects of my project:
>> - Beam 2.32
>> - Flink 1.12.2
>> - FlinkRunner 1-12
>> - building on java 8
>>
>> Any ideas or help is greatly appreciated. Please and thank you,
>> Marco.
>>
>
  • FileIO bug? Marco Costantini
    • Re: FileIO bug? Marco Costantini

Reply via email to