That is fine, till flink assure at-least-once semantics ?

If the contents of a .pending file, through the turbulence ( restarts etc
)  are assured to be in another file than anything starting with "_"
underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Sorry for the confusion. The framework (Flink) does currently not do any
> cleanup of pending files, yes.
>
> Best,
> Aljoscha
>
>
> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> >> You should only have these dangling pending files after a
> failure-recovery cycle, as you noticed. My suggestion would be to
> periodically clean up older pending files.
>
> A little confused. Is that what the framework should do, or us as part of
> some cleanup job ?
>
>
>
>
>
> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> The BucketingSink does not clean up pending files on purpose. In a
>> distributed setting, and especially with rescaling of Flink operators, it
>> is sufficiently hard to figure out which of the pending files you actually
>> can delete and which of them you have to leave because they will get moved
>> to "final" as part of recovering from a checkpoint on some other parallel
>> instance of the sink.
>>
>> You should only have these dangling pending files after a
>> failure-recovery cycle, as you noticed. My suggestion would be to
>> periodically clean up older pending files.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Vishal,
>>
>> what pending files should indeed get eventually finalized. This happens
>> on a checkpoint complete notification. Thus, what you report seems not
>> right. Maybe Aljoscha can shed a bit more light into the problem.
>>
>> In order to further debug the problem, it would be really helpful to get
>> access to DEBUG log files of a TM which runs the BucketingSink.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> I have the same concern about save pointing in BucketingSink.
>>> As for your question, I think before the pending files get cleared in
>>> handleRestoredBucketState .
>>> They are finalized in notifyCheckpointComplete
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>
>>> I'm looking into this part of the source code now, since we are
>>> experiencing some unclosed files after check pointing.
>>> It would be great if you can share more if you find something new about
>>> your problem, which might help with our problem.
>>>
>>> Best regards,
>>> Mu
>>>
>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>>
>>>>
>>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs
>>>> and we see this situation. 2 files ( on either sides )  that were dealt
>>>> with fine but a dangling .pending file. I am sure this is not what is meant
>>>> to be.   We I think have an edge condition and looking at the code it is
>>>> not obvious. May be some one who wrote the code can shed some light as to
>>>> how can this happen.
>>>>
>>>>
>>>>
>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> without --allowNonRestoredState, on a suspend/resume we do see the
>>>>> length file along with the finalized file ( finalized during resume )
>>>>>
>>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>>
>>>>> that does makes much more sense.
>>>>>
>>>>> I guess we should document --allowNonRestoredState better ? It seems
>>>>> it actually drops state ?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but
>>>>>> the lifecycle of cancel/resume is 2 steps
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1. Cancel job with SP
>>>>>>
>>>>>>
>>>>>> closeCurrentPartFile
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>>
>>>>>> is called from close()
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>>
>>>>>>
>>>>>> and that moves files to pending state.  That I would presume is
>>>>>> called when one does a cancel.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2. The restore on resume
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>>
>>>>>> calls
>>>>>>
>>>>>> handleRestoredBucketState
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>>
>>>>>> clears the pending files from state without finalizing them?
>>>>>>
>>>>>>
>>>>>>
>>>>>> That does not seem to be right. I must be reading the code totally
>>>>>> wrong ?
>>>>>>
>>>>>> I am not sure also whether --allowNonRestoredState is skipping
>>>>>> getting the state . At least https://ci.apache.org/pr
>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not
>>>>>> exactly clear what it does if we add an operator ( GDF I think will add a
>>>>>> new operator in the DAG without state even if stateful, in my case the 
>>>>>> Map
>>>>>> operator is not even stateful )
>>>>>>
>>>>>>
>>>>>> Thanks and please bear with me if this is all something pretty simple.
>>>>>>
>>>>>> Vishal
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> What should be the behavior of BucketingSink vis a vis state (
>>>>>>> pending , inprogess and finalization ) when we suspend and resume ?
>>>>>>>
>>>>>>> So I did this
>>>>>>>
>>>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>>> --allowNonRestoredState as in I had added a harmless MapOperator (
>>>>>>> stateless ).
>>>>>>>
>>>>>>> * I see that a file on hdfs, the file being written to ( before the
>>>>>>> cancel with save point )  go into a pending state
>>>>>>> _part-0-21.pending
>>>>>>>
>>>>>>> * I see a new file being written to in the resumed pipe
>>>>>>> _part-0-22.in-progress.
>>>>>>>
>>>>>>> What  I do not see is the file in  _part-0-21.pending being
>>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed that
>>>>>>> would be the case in this controlled suspend/resume circumstance. 
>>>>>>> Further
>>>>>>> it is a rename and hdfs mv is not an expensive operation.
>>>>>>>
>>>>>>>
>>>>>>> Am I understanding the process correct and it yes any pointers ?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Vishal
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>

Reply via email to