Hi,

Sorry for the confusion. The framework (Flink) does currently not do any 
cleanup of pending files, yes.

Best,
Aljoscha

> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> >> You should only have these dangling pending files after a failure-recovery 
> >> cycle, as you noticed. My suggestion would be to periodically clean up 
> >> older pending files.
> 
> A little confused. Is that what the framework should do, or us as part of 
> some cleanup job ?
> 
> 
> 
> 
> 
> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> The BucketingSink does not clean up pending files on purpose. In a 
> distributed setting, and especially with rescaling of Flink operators, it is 
> sufficiently hard to figure out which of the pending files you actually can 
> delete and which of them you have to leave because they will get moved to 
> "final" as part of recovering from a checkpoint on some other parallel 
> instance of the sink.
> 
> You should only have these dangling pending files after a failure-recovery 
> cycle, as you noticed. My suggestion would be to periodically clean up older 
> pending files.
> 
> Best,
> Aljoscha
> 
> 
>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Vishal,
>> 
>> what pending files should indeed get eventually finalized. This happens on a 
>> checkpoint complete notification. Thus, what you report seems not right. 
>> Maybe Aljoscha can shed a bit more light into the problem.
>> 
>> In order to further debug the problem, it would be really helpful to get 
>> access to DEBUG log files of a TM which runs the BucketingSink.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com 
>> <mailto:kong.mu....@gmail.com>> wrote:
>> Hi Vishal,
>> 
>> I have the same concern about save pointing in BucketingSink.
>> As for your question, I think before the pending files get cleared in 
>> handleRestoredBucketState .
>> They are finalized in notifyCheckpointComplete
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628>
>> 
>> I'm looking into this part of the source code now, since we are experiencing 
>> some unclosed files after check pointing.
>> It would be great if you can share more if you find something new about your 
>> problem, which might help with our problem.
>> 
>> Best regards,
>> Mu
>> 
>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>> 
>> 
>> This is strange, we had a few retries b'coz of an OOM on one of the TMs and 
>> we see this situation. 2 files ( on either sides )  that were dealt with 
>> fine but a dangling .pending file. I am sure this is not what is meant to 
>> be.   We I think have an edge condition and looking at the code it is not 
>> obvious. May be some one who wrote the code can shed some light as to how 
>> can this happen.
>> 
>> 
>> 
>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> without --allowNonRestoredState, on a suspend/resume we do see the length 
>> file along with the finalized file ( finalized during resume ) 
>> 
>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57 
>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>> 
>> that does makes much more sense. 
>> 
>> I guess we should document --allowNonRestoredState better ? It seems it 
>> actually drops state ?
>> 
>> 
>> 
>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> This is 1.4 BTW.  I am not sure that I am reading this correctly but the 
>> lifecycle of cancel/resume is 2 steps
>> 
>> 
>> 
>> 1. Cancel job with SP
>> 
>> 
>> closeCurrentPartFile
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549>
>> 
>> is called from close()
>> 
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416>
>> 
>> 
>> and that moves files to pending state.  That I would presume is called when 
>> one does a cancel.
>> 
>> 
>> 
>> 2. The restore on resume 
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369>
>> 
>> calls 
>> 
>> handleRestoredBucketState
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704>
>> 
>> clears the pending files from state without finalizing them?
>> 
>> 
>> 
>> That does not seem to be right. I must be reading the code totally wrong ?
>> 
>> I am not sure also whether --allowNonRestoredState is skipping getting the 
>> state . At least 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints>
>>  is not exactly clear what it does if we add an operator ( GDF I think will 
>> add a new operator in the DAG without state even if stateful, in my case the 
>> Map operator is not even stateful )
>> 
>> 
>> Thanks and please bear with me if this is all something pretty simple.
>> 
>> Vishal
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> What should be the behavior of BucketingSink vis a vis state ( pending , 
>> inprogess and finalization ) when we suspend and resume ?
>> 
>> So I did this
>> 
>> * I had a pipe writing to hdfs suspend and resume using 
>> --allowNonRestoredState as in I had added a harmless MapOperator ( stateless 
>> ).
>> 
>> * I see that a file on hdfs, the file being written to ( before the cancel 
>> with save point )  go into a pending state  _part-0-21.pending 
>> 
>> * I see a new file being written to in the resumed pipe    
>> _part-0-22.in-progress.
>> 
>> What  I do not see is the file in  _part-0-21.pending being finalized ( as 
>> in renamed to a just part-0-21. I would have assumed that would be the case 
>> in this controlled suspend/resume circumstance. Further it is a rename and 
>> hdfs mv is not an expensive operation.
>> 
>> 
>> Am I understanding the process correct and it yes any pointers ?
>> 
>> Regards,
>> 
>> Vishal
>> 
>> 
>> 
>> 
>> 
> 
> 

Reply via email to