>> After the savepoint state has been written, the sink might start new
.in-progress files. These files are not part of the savepoint but renamed
to .pending in close().
>> On restore all pending files that are part of the savepoint are moved
into final state (and possibly truncated). See
handlePendingInProgressFiles() method.
>> Pending files that are not part of the savepoint (because they were
created later between taking the savepoint and shutting the job down) are
not touched and remain >> as .pending files.

>> These should be the .pending files that you observe. Since they contain
data that is not part of the savepoint, it should be save to delete them.
>> If you keep them, you will have at-least-once output.


Is the above also possible with in-progress file ?  I had a situation where
we see such  a dangling file through a restart on error.


On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Thank you Fabian,
>
>     What is more important ( and I think you might have addressed it in
> your post so sorry for being a little obtuse ) is that deleting them does
> not violate "at-least-once" delivery.  And if that is a definite than we
> are fine with it, though we will test it further.
>
> Thanks and Regards.
>
>
>
> On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Vishal, hi Mu,
>>
>> After the savepoint state has been written, the sink might start new
>> .in-progress files. These files are not part of the savepoint but renamed
>> to .pending in close().
>> On restore all pending files that are part of the savepoint are moved
>> into final state (and possibly truncated). See
>> handlePendingInProgressFiles() method.
>> Pending files that are not part of the savepoint (because they were
>> created later between taking the savepoint and shutting the job down) are
>> not touched and remain as .pending files.
>>
>> These should be the .pending files that you observe. Since they contain
>> data that is not part of the savepoint, it should be save to delete them.
>> If you keep them, you will have at-least-once output.
>>
>> Best, Fabian
>>
>>
>> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com>:
>>
>>> Hi Aljoscha,
>>>
>>> Thanks for confirming that fact that Flink doesn't clean up pending
>>> files.
>>> Is that safe to clean(remove) all the pending files after cancel(w/ or
>>> w/o savepointing) or failover?
>>> If we do that, will we lose some data?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Mu
>>>
>>>
>>>
>>>
>>> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Sorry, but just wanted to confirm that  the assertion "at-least-once"
>>>> delivery  true if there is a dangling pending file ?
>>>>
>>>> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> That is fine, till flink assure at-least-once semantics ?
>>>>>
>>>>> If the contents of a .pending file, through the turbulence ( restarts
>>>>> etc )  are assured to be in another file than anything starting with "_"
>>>>> underscore will by default ignored by hadoop ( hive or MR etc ).
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Sorry for the confusion. The framework (Flink) does currently not do
>>>>>> any cleanup of pending files, yes.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> >> You should only have these dangling pending files after a
>>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>>> periodically clean up older pending files.
>>>>>>
>>>>>> A little confused. Is that what the framework should do, or us as
>>>>>> part of some cleanup job ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> The BucketingSink does not clean up pending files on purpose. In a
>>>>>>> distributed setting, and especially with rescaling of Flink operators, 
>>>>>>> it
>>>>>>> is sufficiently hard to figure out which of the pending files you 
>>>>>>> actually
>>>>>>> can delete and which of them you have to leave because they will get 
>>>>>>> moved
>>>>>>> to "final" as part of recovering from a checkpoint on some other 
>>>>>>> parallel
>>>>>>> instance of the sink.
>>>>>>>
>>>>>>> You should only have these dangling pending files after a
>>>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>>>> periodically clean up older pending files.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> what pending files should indeed get eventually finalized. This
>>>>>>> happens on a checkpoint complete notification. Thus, what you report 
>>>>>>> seems
>>>>>>> not right. Maybe Aljoscha can shed a bit more light into the problem.
>>>>>>>
>>>>>>> In order to further debug the problem, it would be really helpful to
>>>>>>> get access to DEBUG log files of a TM which runs the BucketingSink.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> I have the same concern about save pointing in BucketingSink.
>>>>>>>> As for your question, I think before the pending files get cleared
>>>>>>>> in handleRestoredBucketState .
>>>>>>>> They are finalized in notifyCheckpointComplete
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>>>>>>
>>>>>>>> I'm looking into this part of the source code now, since we are
>>>>>>>> experiencing some unclosed files after check pointing.
>>>>>>>> It would be great if you can share more if you find something new
>>>>>>>> about your problem, which might help with our problem.
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Mu
>>>>>>>>
>>>>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid
>>>>>>>>> -length
>>>>>>>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid
>>>>>>>>> -length
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is strange, we had a few retries b'coz of an OOM on one of
>>>>>>>>> the TMs and we see this situation. 2 files ( on either sides )  that 
>>>>>>>>> were
>>>>>>>>> dealt with fine but a dangling .pending file. I am sure this is not 
>>>>>>>>> what is
>>>>>>>>> meant to be.   We I think have an edge condition and looking at the 
>>>>>>>>> code it
>>>>>>>>> is not obvious. May be some one who wrote the code can shed some 
>>>>>>>>> light as
>>>>>>>>> to how can this happen.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> without --allowNonRestoredState, on a suspend/resume we do see
>>>>>>>>>> the length file along with the finalized file ( finalized during 
>>>>>>>>>> resume )
>>>>>>>>>>
>>>>>>>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>>>>>>>
>>>>>>>>>> that does makes much more sense.
>>>>>>>>>>
>>>>>>>>>> I guess we should document --allowNonRestoredState better ? It
>>>>>>>>>> seems it actually drops state ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly
>>>>>>>>>>> but the lifecycle of cancel/resume is 2 steps
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 1. Cancel job with SP
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> closeCurrentPartFile
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>>>>>>>
>>>>>>>>>>> is called from close()
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> and that moves files to pending state.  That I would presume is
>>>>>>>>>>> called when one does a cancel.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2. The restore on resume
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>>>>>>>
>>>>>>>>>>> calls
>>>>>>>>>>>
>>>>>>>>>>> handleRestoredBucketState
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>>>>>>>
>>>>>>>>>>> clears the pending files from state without finalizing them?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That does not seem to be right. I must be reading the code
>>>>>>>>>>> totally wrong ?
>>>>>>>>>>>
>>>>>>>>>>> I am not sure also whether --allowNonRestoredState is skipping
>>>>>>>>>>> getting the state . At least https://ci.apache.org/pr
>>>>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is
>>>>>>>>>>> not exactly clear what it does if we add an operator ( GDF I think 
>>>>>>>>>>> will add
>>>>>>>>>>> a new operator in the DAG without state even if stateful, in my 
>>>>>>>>>>> case the
>>>>>>>>>>> Map operator is not even stateful )
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks and please bear with me if this is all something pretty
>>>>>>>>>>> simple.
>>>>>>>>>>>
>>>>>>>>>>> Vishal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What should be the behavior of BucketingSink vis a vis state (
>>>>>>>>>>>> pending , inprogess and finalization ) when we suspend and resume ?
>>>>>>>>>>>>
>>>>>>>>>>>> So I did this
>>>>>>>>>>>>
>>>>>>>>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>>>>>>>> --allowNonRestoredState as in I had added a harmless
>>>>>>>>>>>> MapOperator ( stateless ).
>>>>>>>>>>>>
>>>>>>>>>>>> * I see that a file on hdfs, the file being written to ( before
>>>>>>>>>>>> the cancel with save point )  go into a pending state
>>>>>>>>>>>> _part-0-21.pending
>>>>>>>>>>>>
>>>>>>>>>>>> * I see a new file being written to in the resumed pipe
>>>>>>>>>>>> _part-0-22.in-progress.
>>>>>>>>>>>>
>>>>>>>>>>>> What  I do not see is the file in  _part-0-21.pending being
>>>>>>>>>>>> finalized ( as in renamed to a just part-0-21. I would have 
>>>>>>>>>>>> assumed that
>>>>>>>>>>>> would be the case in this controlled suspend/resume circumstance. 
>>>>>>>>>>>> Further
>>>>>>>>>>>> it is a rename and hdfs mv is not an expensive operation.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Am I understanding the process correct and it yes any pointers ?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Vishal
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to