I would expect that to be possible as well, yes.

> On 21. Apr 2018, at 17:33, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> >> After the savepoint state has been written, the sink might start new 
> >> .in-progress files. These files are not part of the savepoint but renamed 
> >> to .pending in close().
> >> On restore all pending files that are part of the savepoint are moved into 
> >> final state (and possibly truncated). See handlePendingInProgressFiles() 
> >> method.
> >> Pending files that are not part of the savepoint (because they were 
> >> created later between taking the savepoint and shutting the job down) are 
> >> not touched and remain >> as .pending files.
> 
> >> These should be the .pending files that you observe. Since they contain 
> >> data that is not part of the savepoint, it should be save to delete them.
> >> If you keep them, you will have at-least-once output.
> 
> 
> Is the above also possible with in-progress file ?  I had a situation where 
> we see such  a dangling file through a restart on error.
> 
> 
> On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> Thank you Fabian, 
> 
>     What is more important ( and I think you might have addressed it in your 
> post so sorry for being a little obtuse ) is that deleting them does not 
> violate "at-least-once" delivery.  And if that is a definite than we are fine 
> with it, though we will test it further.
> 
> Thanks and Regards.
> 
> 
> 
> On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hi Vishal, hi Mu,
> 
> After the savepoint state has been written, the sink might start new 
> .in-progress files. These files are not part of the savepoint but renamed to 
> .pending in close().
> On restore all pending files that are part of the savepoint are moved into 
> final state (and possibly truncated). See handlePendingInProgressFiles() 
> method.
> Pending files that are not part of the savepoint (because they were created 
> later between taking the savepoint and shutting the job down) are not touched 
> and remain as .pending files.
> 
> These should be the .pending files that you observe. Since they contain data 
> that is not part of the savepoint, it should be save to delete them.
> If you keep them, you will have at-least-once output.
> 
> Best, Fabian
> 
> 
> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com 
> <mailto:kong.mu....@gmail.com>>:
> Hi Aljoscha,
> 
> Thanks for confirming that fact that Flink doesn't clean up pending files.
> Is that safe to clean(remove) all the pending files after cancel(w/ or w/o 
> savepointing) or failover?
> If we do that, will we lose some data?
> 
> Thanks!
> 
> Best,
> Mu
> 
> 
> 
> 
> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> Sorry, but just wanted to confirm that  the assertion "at-least-once"  
> delivery  true if there is a dangling pending file ? 
> 
> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> That is fine, till flink assure at-least-once semantics ? 
> 
> If the contents of a .pending file, through the turbulence ( restarts etc )  
> are assured to be in another file than anything starting with "_" underscore 
> will by default ignored by hadoop ( hive or MR etc ).
> 
> 
> 
> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> Sorry for the confusion. The framework (Flink) does currently not do any 
> cleanup of pending files, yes.
> 
> Best,
> Aljoscha
> 
> 
>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> >> You should only have these dangling pending files after a 
>> >> failure-recovery cycle, as you noticed. My suggestion would be to 
>> >> periodically clean up older pending files.
>> 
>> A little confused. Is that what the framework should do, or us as part of 
>> some cleanup job ?
>> 
>> 
>> 
>> 
>> 
>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> 
>> The BucketingSink does not clean up pending files on purpose. In a 
>> distributed setting, and especially with rescaling of Flink operators, it is 
>> sufficiently hard to figure out which of the pending files you actually can 
>> delete and which of them you have to leave because they will get moved to 
>> "final" as part of recovering from a checkpoint on some other parallel 
>> instance of the sink.
>> 
>> You should only have these dangling pending files after a failure-recovery 
>> cycle, as you noticed. My suggestion would be to periodically clean up older 
>> pending files.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org 
>>> <mailto:trohrm...@apache.org>> wrote:
>>> 
>>> Hi Vishal,
>>> 
>>> what pending files should indeed get eventually finalized. This happens on 
>>> a checkpoint complete notification. Thus, what you report seems not right. 
>>> Maybe Aljoscha can shed a bit more light into the problem.
>>> 
>>> In order to further debug the problem, it would be really helpful to get 
>>> access to DEBUG log files of a TM which runs the BucketingSink.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com 
>>> <mailto:kong.mu....@gmail.com>> wrote:
>>> Hi Vishal,
>>> 
>>> I have the same concern about save pointing in BucketingSink.
>>> As for your question, I think before the pending files get cleared in 
>>> handleRestoredBucketState .
>>> They are finalized in notifyCheckpointComplete
>>> 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628>
>>> 
>>> I'm looking into this part of the source code now, since we are 
>>> experiencing some unclosed files after check pointing.
>>> It would be great if you can share more if you find something new about 
>>> your problem, which might help with our problem.
>>> 
>>> Best regards,
>>> Mu
>>> 
>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi 
>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48 
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17 
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>> 
>>> 
>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs and 
>>> we see this situation. 2 files ( on either sides )  that were dealt with 
>>> fine but a dangling .pending file. I am sure this is not what is meant to 
>>> be.   We I think have an edge condition and looking at the code it is not 
>>> obvious. May be some one who wrote the code can shed some light as to how 
>>> can this happen.
>>> 
>>> 
>>> 
>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> without --allowNonRestoredState, on a suspend/resume we do see the length 
>>> file along with the finalized file ( finalized during resume ) 
>>> 
>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57 
>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>> 
>>> that does makes much more sense. 
>>> 
>>> I guess we should document --allowNonRestoredState better ? It seems it 
>>> actually drops state ?
>>> 
>>> 
>>> 
>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but the 
>>> lifecycle of cancel/resume is 2 steps
>>> 
>>> 
>>> 
>>> 1. Cancel job with SP
>>> 
>>> 
>>> closeCurrentPartFile
>>> 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549>
>>> 
>>> is called from close()
>>> 
>>> 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416>
>>> 
>>> 
>>> and that moves files to pending state.  That I would presume is called when 
>>> one does a cancel.
>>> 
>>> 
>>> 
>>> 2. The restore on resume 
>>> 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369>
>>> 
>>> calls 
>>> 
>>> handleRestoredBucketState
>>> 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704>
>>> 
>>> clears the pending files from state without finalizing them?
>>> 
>>> 
>>> 
>>> That does not seem to be right. I must be reading the code totally wrong ?
>>> 
>>> I am not sure also whether --allowNonRestoredState is skipping getting the 
>>> state . At least 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints>
>>>  is not exactly clear what it does if we add an operator ( GDF I think will 
>>> add a new operator in the DAG without state even if stateful, in my case 
>>> the Map operator is not even stateful )
>>> 
>>> 
>>> Thanks and please bear with me if this is all something pretty simple.
>>> 
>>> Vishal
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> What should be the behavior of BucketingSink vis a vis state ( pending , 
>>> inprogess and finalization ) when we suspend and resume ?
>>> 
>>> So I did this
>>> 
>>> * I had a pipe writing to hdfs suspend and resume using 
>>> --allowNonRestoredState as in I had added a harmless MapOperator ( 
>>> stateless ).
>>> 
>>> * I see that a file on hdfs, the file being written to ( before the cancel 
>>> with save point )  go into a pending state  _part-0-21.pending 
>>> 
>>> * I see a new file being written to in the resumed pipe    
>>> _part-0-22.in-progress.
>>> 
>>> What  I do not see is the file in  _part-0-21.pending being finalized ( as 
>>> in renamed to a just part-0-21. I would have assumed that would be the case 
>>> in this controlled suspend/resume circumstance. Further it is a rename and 
>>> hdfs mv is not an expensive operation.
>>> 
>>> 
>>> Am I understanding the process correct and it yes any pointers ?
>>> 
>>> Regards,
>>> 
>>> Vishal
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> 
> 
> 
> 

Reply via email to