Hi Vishal,

what pending files should indeed get eventually finalized. This happens on
a checkpoint complete notification. Thus, what you report seems not right.
Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get
access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote:

> Hi Vishal,
>
> I have the same concern about save pointing in BucketingSink.
> As for your question, I think before the pending files get cleared in
> handleRestoredBucketState .
> They are finalized in notifyCheckpointComplete
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>
> I'm looking into this part of the source code now, since we are
> experiencing some unclosed files after check pointing.
> It would be great if you can share more if you find something new about
> your problem, which might help with our problem.
>
> Best regards,
> Mu
>
> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>
>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>
>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>
>>
>> This is strange, we had a few retries b'coz of an OOM on one of the TMs
>> and we see this situation. 2 files ( on either sides )  that were dealt
>> with fine but a dangling .pending file. I am sure this is not what is meant
>> to be.   We I think have an edge condition and looking at the code it is
>> not obvious. May be some one who wrote the code can shed some light as to
>> how can this happen.
>>
>>
>>
>>
>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> without --allowNonRestoredState, on a suspend/resume we do see the
>>> length file along with the finalized file ( finalized during resume )
>>>
>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>
>>> that does makes much more sense.
>>>
>>> I guess we should document --allowNonRestoredState better ? It seems it
>>> actually drops state ?
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but
>>>> the lifecycle of cancel/resume is 2 steps
>>>>
>>>>
>>>>
>>>> 1. Cancel job with SP
>>>>
>>>>
>>>> closeCurrentPartFile
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>
>>>> is called from close()
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>
>>>>
>>>> and that moves files to pending state.  That I would presume is called
>>>> when one does a cancel.
>>>>
>>>>
>>>>
>>>> 2. The restore on resume
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>
>>>> calls
>>>>
>>>> handleRestoredBucketState
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>
>>>> clears the pending files from state without finalizing them?
>>>>
>>>>
>>>>
>>>> That does not seem to be right. I must be reading the code totally
>>>> wrong ?
>>>>
>>>> I am not sure also whether --allowNonRestoredState is skipping getting
>>>> the state . At least https://ci.apache.org/pr
>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not
>>>> exactly clear what it does if we add an operator ( GDF I think will add a
>>>> new operator in the DAG without state even if stateful, in my case the Map
>>>> operator is not even stateful )
>>>>
>>>>
>>>> Thanks and please bear with me if this is all something pretty simple.
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> What should be the behavior of BucketingSink vis a vis state ( pending
>>>>> , inprogess and finalization ) when we suspend and resume ?
>>>>>
>>>>> So I did this
>>>>>
>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>
>>>>> --allowNonRestoredState as in I had added a harmless MapOperator (
>>>>> stateless ).
>>>>>
>>>>>
>>>>> * I see that a file on hdfs, the file being written to ( before the
>>>>> cancel with save point )  go into a pending state  _part-0-21.pending
>>>>>
>>>>>
>>>>> * I see a new file being written to in the resumed pipe
>>>>> _part-0-22.in-progress.
>>>>>
>>>>>
>>>>> What  I do not see is the file in  _part-0-21.pending being finalized
>>>>> ( as in renamed to a just part-0-21. I would have assumed that would be 
>>>>> the
>>>>> case in this controlled suspend/resume circumstance. Further it is a 
>>>>> rename
>>>>> and hdfs mv is not an expensive operation.
>>>>>
>>>>>
>>>>>
>>>>> Am I understanding the process correct and it yes any pointers ?
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>>
>>>>> Vishal
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to