Hi,

I think your analysis is correct. One thing to note here is that I
guess when implementing the StaticFileSplitEnumerator we only thought
about the batch case where no checkpoints exist [1] on the other hand
it is possible as you have noted to run a bounded source in streaming
mode.

Although in the current implementation we already checkpoint the
remaining splits of the StaticFileSplitEnumerator so it should be easy
to also pass the alreadyDiscoveredPaths to the
StaticFileSplitEnumerator.

@Krzysztof Chmielewski can you create a ticket for that?

Best,
Fabian


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
<krzysiek.chmielew...@gmail.com> wrote:
>
> Hi,
> Yes I know that ContinuousFileSplitEnumerator has continuously scan the 
> monitored folder for the new files and StaticFileSplitEnumerator does not, 
> this is clear.
>
> However I was asking about a different scenario, the scenario when we are 
> restoring from a checkpoint.
> FileSource can process many files, not only one. The underlying API uses 
> array of paths not just single Path.
>
> If I understand correctly, when we are recovering from a checkpoint, for 
> example due to Job Manager issue, FileEnumerator will create an Array of 
> Splits and pass it to StaticFileSplitEnumerator.
>
> Same goes for ContinuousFileSplitEnumerator. However  when 
> ContinuousFileSplitEnumerator is started, it iterates through Path[] array 
> and checks which files were already processed and skip them using 
> pathsAlreadyProcessed set hence not creating Splits for those files.
>
> However it seems that StaticFileSplitEnumerator will reprocess files that 
> were already used for Split creation. In case of Checkpoint restoration it 
> does not check if that file was already processed.
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
> czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <tsreape...@gmail.com> napisał:
>>
>> Hi!
>>
>> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>>
>> This is because ContinuousFileSplitEnumerator has to continuously add new 
>> files to splitAssigner, while StaticFileSplitEnumerator does not. The 
>> pathsAlreadyProcessed set records the paths already discovered by 
>> ContinuousFileSplitEnumerator so that it will not add the same file to 
>> splitAssigner twice. For StaticFileSplitEnumerator it does not need to 
>> discover new files and all files have already been recorded in its 
>> splitAssigner so it does not need the pathsAlreadyProcessed set.
>>
>> For more detailed logic check the caller of the constructors of both 
>> enumerators.
>>
>> Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2022年1月6日周四 07:04写道:
>>>
>>> Hi,
>>> Why StaticFileSplitEnumerator from FileSource does not track the already 
>>> processed files similar to how ContinuousFileSplitEnumerator does?
>>>
>>> I'm thinking about scenario where we have a Bounded FileSource that reads a 
>>> lot of files using FileSource and stream it's content to Kafka.If there 
>>> will be a Job/cluster restart then we will process same files again.
>>>
>>> Regards,
>>> Krzysztof Chmielewski

Reply via email to