Hi team,
I'm also interested in finding out if there is Java code available to
determine the extent to which a Flink job has processed files within a
directory. Additionally, I'm curious about where the details of the
processed files are stored within Flink.

Thanks and regards,
Arjun S

On Mon, 30 Oct 2023 at 10:48, arjun s <arjunjoice...@gmail.com> wrote:

> Hi team,
>
> I appreciate the information provided. I'm inquiring whether there exists
> a method to automatically relocate processed files from a directory once a
> Flink job has completed processing them. I'm particularly keen on
> understanding how this particular use case is currently managed in
> production environments. Additionally, I'm curious about whether there's a
> way to track which files in the directory have been processed by the Flink
> job.
> Looking forward to your response.
>
> Thank you. Regards
> Arjun
>
> On Sat, 28 Oct 2023 at 23:42, Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> > Or was it the querying of the checkpoints you were advising against?
>>
>> Yes, I meant the approach, not file removal itself. Mainly because how
>> exactly FileSource stores its state is an implementation detail and there
>> are no external guarantees for its consistency between even the minor
>> versions.
>> On top of that, the original author of the StateProcessor API has moved
>> to another project, so it has not been actively worked on recently. I am
>> not sure it is even possible to access the FileSource state directly with
>> it since FLIP-27 sources do not use the OperatorState abstraction directly
>> [1].
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L510
>>
>> Best,
>> Alexander
>>
>> On Sat, 28 Oct 2023 at 16:13, Andrew Otto <o...@wikimedia.org> wrote:
>>
>>> > This is not a robust solution, I would advise against it.
>>> Oh no?  Am curious as to why not.  It seems not dissimilar to how Kafka
>>> topic retention works: the messages are removed after some time period
>>> (hopefully after they are processed), so why would it be bad to remove
>>> files that are already processed?
>>>
>>> Or was it the querying of the checkpoints you were advising against?
>>>
>>> To be sure, I was referring to moving the previously processed files
>>> away, not the checkpoints themselves.
>>>
>>> On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov <
>>> alexander.fedu...@gmail.com> wrote:
>>>
>>>> > I wonder if you could use this fact to query the committed
>>>> checkpoints and move them away after the job is done.
>>>>
>>>> This is not a robust solution, I would advise against it.
>>>>
>>>> Best,
>>>> Alexander
>>>>
>>>> On Fri, 27 Oct 2023 at 16:41, Andrew Otto <o...@wikimedia.org> wrote:
>>>>
>>>>> For moving the files:
>>>>> > It will keep the files as is and remember the name of the file read
>>>>> in checkpointed state to ensure it doesnt read the same file twice.
>>>>>
>>>>> I wonder if you could use this fact to query the committed checkpoints
>>>>> and move them away after the job is done.  I think it should even be safe
>>>>> to do this outside of the Flink job periodically (cron, whatever), because
>>>>> on restart it won't reprocess the files that have been committed in the
>>>>> checkpoints.
>>>>>
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 27, 2023 at 1:13 AM arjun s <arjunjoice...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi team, Thanks for your quick response.
>>>>>> I have an inquiry regarding file processing in the event of a job
>>>>>> restart. When the job is restarted, we encounter challenges in tracking
>>>>>> which files have been processed and which remain pending. Is there a 
>>>>>> method
>>>>>> to seamlessly resume processing files from where they were left off,
>>>>>> particularly in situations where we need to submit and restart the job
>>>>>> manually due to any server restart or application restart? This becomes 
>>>>>> an
>>>>>> issue when the job processes all the files in the directory from the
>>>>>> beginning after a restart, and I'm seeking a solution to address this.
>>>>>>
>>>>>> Thanks and regards,
>>>>>> Arjun
>>>>>>
>>>>>> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan <chirag.dewa...@yahoo.in>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Arjun,
>>>>>>>
>>>>>>> Flink's FileSource doesnt move or delete the files as of now. It
>>>>>>> will keep the files as is and remember the name of the file read in
>>>>>>> checkpointed state to ensure it doesnt read the same file twice.
>>>>>>>
>>>>>>> Flink's source API works in a way that single Enumerator operates on
>>>>>>> the JobManager. The enumerator is responsible for listing the files and
>>>>>>> splitting these into smaller units. These units could be the complete 
>>>>>>> file
>>>>>>> (in case of row formats) or splits within a file (for bulk formats). The
>>>>>>> reading is done by SplitReaders in the Task Managers. This way it 
>>>>>>> ensures
>>>>>>> that only reading is done concurrently and is able to track file
>>>>>>> completions.
>>>>>>>
>>>>>>> You can read more Flink Sources
>>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/>
>>>>>>>  and here
>>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/>
>>>>>>>
>>>>>>> FileSystem
>>>>>>>
>>>>>>> FileSystem # This connector provides a unified Source and Sink for
>>>>>>> BATCH and STREAMING that reads or writes (par...
>>>>>>>
>>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s <
>>>>>>> arjunjoice...@gmail.com> wrote:
>>>>>>>
>>>>>>>
>>>>>>> Hello team,
>>>>>>> I'm currently in the process of configuring a Flink job. This job
>>>>>>> entails reading files from a specified directory and then transmitting 
>>>>>>> the
>>>>>>> data to a Kafka sink. I've already successfully designed a Flink job 
>>>>>>> that
>>>>>>> reads the file contents in a streaming manner and effectively sends 
>>>>>>> them to
>>>>>>> Kafka. However, my specific requirement is a bit more intricate. I need 
>>>>>>> the
>>>>>>> job to not only read these files and push the data to Kafka but also
>>>>>>> relocate the processed file to a different directory once all of its
>>>>>>> contents have been processed. Following this, the job should seamlessly
>>>>>>> transition to processing the next file in the source directory.
>>>>>>> Additionally, I have some concerns regarding how the job will behave if 
>>>>>>> it
>>>>>>> encounters a restart. Could you please advise if this is achievable, 
>>>>>>> and if
>>>>>>> so, provide guidance or code to implement it?
>>>>>>>
>>>>>>> I'm also quite interested in how the job will handle situations
>>>>>>> where the source has a parallelism greater than 2 or 3, and how it can
>>>>>>> accurately monitor the completion of reading all contents in each file.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Arjun
>>>>>>>
>>>>>>

Reply via email to