Thank you Fabian,

    What is more important ( and I think you might have addressed it in
your post so sorry for being a little obtuse ) is that deleting them does
not violate "at-least-once" delivery.  And if that is a definite than we
are fine with it, though we will test it further.

Thanks and Regards.



On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Vishal, hi Mu,
>
> After the savepoint state has been written, the sink might start new
> .in-progress files. These files are not part of the savepoint but renamed
> to .pending in close().
> On restore all pending files that are part of the savepoint are moved into
> final state (and possibly truncated). See handlePendingInProgressFiles()
> method.
> Pending files that are not part of the savepoint (because they were
> created later between taking the savepoint and shutting the job down) are
> not touched and remain as .pending files.
>
> These should be the .pending files that you observe. Since they contain
> data that is not part of the savepoint, it should be save to delete them.
> If you keep them, you will have at-least-once output.
>
> Best, Fabian
>
>
> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com>:
>
>> Hi Aljoscha,
>>
>> Thanks for confirming that fact that Flink doesn't clean up pending files.
>> Is that safe to clean(remove) all the pending files after cancel(w/ or
>> w/o savepointing) or failover?
>> If we do that, will we lose some data?
>>
>> Thanks!
>>
>> Best,
>> Mu
>>
>>
>>
>>
>> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Sorry, but just wanted to confirm that  the assertion "at-least-once"
>>> delivery  true if there is a dangling pending file ?
>>>
>>> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> That is fine, till flink assure at-least-once semantics ?
>>>>
>>>> If the contents of a .pending file, through the turbulence ( restarts
>>>> etc )  are assured to be in another file than anything starting with "_"
>>>> underscore will by default ignored by hadoop ( hive or MR etc ).
>>>>
>>>>
>>>>
>>>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Sorry for the confusion. The framework (Flink) does currently not do
>>>>> any cleanup of pending files, yes.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> >> You should only have these dangling pending files after a
>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>> periodically clean up older pending files.
>>>>>
>>>>> A little confused. Is that what the framework should do, or us as part
>>>>> of some cleanup job ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The BucketingSink does not clean up pending files on purpose. In a
>>>>>> distributed setting, and especially with rescaling of Flink operators, it
>>>>>> is sufficiently hard to figure out which of the pending files you 
>>>>>> actually
>>>>>> can delete and which of them you have to leave because they will get 
>>>>>> moved
>>>>>> to "final" as part of recovering from a checkpoint on some other parallel
>>>>>> instance of the sink.
>>>>>>
>>>>>> You should only have these dangling pending files after a
>>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>>> periodically clean up older pending files.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> what pending files should indeed get eventually finalized. This
>>>>>> happens on a checkpoint complete notification. Thus, what you report 
>>>>>> seems
>>>>>> not right. Maybe Aljoscha can shed a bit more light into the problem.
>>>>>>
>>>>>> In order to further debug the problem, it would be really helpful to
>>>>>> get access to DEBUG log files of a TM which runs the BucketingSink.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> I have the same concern about save pointing in BucketingSink.
>>>>>>> As for your question, I think before the pending files get cleared
>>>>>>> in handleRestoredBucketState .
>>>>>>> They are finalized in notifyCheckpointComplete
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>>>>>
>>>>>>> I'm looking into this part of the source code now, since we are
>>>>>>> experiencing some unclosed files after check pointing.
>>>>>>> It would be great if you can share more if you find something new
>>>>>>> about your problem, which might help with our problem.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Mu
>>>>>>>
>>>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>>>>>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>>>>>>
>>>>>>>>
>>>>>>>> This is strange, we had a few retries b'coz of an OOM on one of the
>>>>>>>> TMs and we see this situation. 2 files ( on either sides )  that were 
>>>>>>>> dealt
>>>>>>>> with fine but a dangling .pending file. I am sure this is not what is 
>>>>>>>> meant
>>>>>>>> to be.   We I think have an edge condition and looking at the code it 
>>>>>>>> is
>>>>>>>> not obvious. May be some one who wrote the code can shed some light as 
>>>>>>>> to
>>>>>>>> how can this happen.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> without --allowNonRestoredState, on a suspend/resume we do see
>>>>>>>>> the length file along with the finalized file ( finalized during 
>>>>>>>>> resume )
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>>>>>>
>>>>>>>>> that does makes much more sense.
>>>>>>>>>
>>>>>>>>> I guess we should document --allowNonRestoredState better ? It
>>>>>>>>> seems it actually drops state ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly
>>>>>>>>>> but the lifecycle of cancel/resume is 2 steps
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. Cancel job with SP
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> closeCurrentPartFile
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>>>>>>
>>>>>>>>>> is called from close()
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and that moves files to pending state.  That I would presume is
>>>>>>>>>> called when one does a cancel.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2. The restore on resume
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>>>>>>
>>>>>>>>>> calls
>>>>>>>>>>
>>>>>>>>>> handleRestoredBucketState
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>>>>>>
>>>>>>>>>> clears the pending files from state without finalizing them?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That does not seem to be right. I must be reading the code
>>>>>>>>>> totally wrong ?
>>>>>>>>>>
>>>>>>>>>> I am not sure also whether --allowNonRestoredState is skipping
>>>>>>>>>> getting the state . At least https://ci.apache.org/pr
>>>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is
>>>>>>>>>> not exactly clear what it does if we add an operator ( GDF I think 
>>>>>>>>>> will add
>>>>>>>>>> a new operator in the DAG without state even if stateful, in my case 
>>>>>>>>>> the
>>>>>>>>>> Map operator is not even stateful )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks and please bear with me if this is all something pretty
>>>>>>>>>> simple.
>>>>>>>>>>
>>>>>>>>>> Vishal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What should be the behavior of BucketingSink vis a vis state (
>>>>>>>>>>> pending , inprogess and finalization ) when we suspend and resume ?
>>>>>>>>>>>
>>>>>>>>>>> So I did this
>>>>>>>>>>>
>>>>>>>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>>>>>>> --allowNonRestoredState as in I had added a harmless
>>>>>>>>>>> MapOperator ( stateless ).
>>>>>>>>>>>
>>>>>>>>>>> * I see that a file on hdfs, the file being written to ( before
>>>>>>>>>>> the cancel with save point )  go into a pending state
>>>>>>>>>>> _part-0-21.pending
>>>>>>>>>>>
>>>>>>>>>>> * I see a new file being written to in the resumed pipe
>>>>>>>>>>> _part-0-22.in-progress.
>>>>>>>>>>>
>>>>>>>>>>> What  I do not see is the file in  _part-0-21.pending being
>>>>>>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed 
>>>>>>>>>>> that
>>>>>>>>>>> would be the case in this controlled suspend/resume circumstance. 
>>>>>>>>>>> Further
>>>>>>>>>>> it is a rename and hdfs mv is not an expensive operation.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Am I understanding the process correct and it yes any pointers ?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> Vishal
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to