Thank you Fabian, What is more important ( and I think you might have addressed it in your post so sorry for being a little obtuse ) is that deleting them does not violate "at-least-once" delivery. And if that is a definite than we are fine with it, though we will test it further.
Thanks and Regards. On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vishal, hi Mu, > > After the savepoint state has been written, the sink might start new > .in-progress files. These files are not part of the savepoint but renamed > to .pending in close(). > On restore all pending files that are part of the savepoint are moved into > final state (and possibly truncated). See handlePendingInProgressFiles() > method. > Pending files that are not part of the savepoint (because they were > created later between taking the savepoint and shutting the job down) are > not touched and remain as .pending files. > > These should be the .pending files that you observe. Since they contain > data that is not part of the savepoint, it should be save to delete them. > If you keep them, you will have at-least-once output. > > Best, Fabian > > > 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com>: > >> Hi Aljoscha, >> >> Thanks for confirming that fact that Flink doesn't clean up pending files. >> Is that safe to clean(remove) all the pending files after cancel(w/ or >> w/o savepointing) or failover? >> If we do that, will we lose some data? >> >> Thanks! >> >> Best, >> Mu >> >> >> >> >> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Sorry, but just wanted to confirm that the assertion "at-least-once" >>> delivery true if there is a dangling pending file ? >>> >>> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> That is fine, till flink assure at-least-once semantics ? >>>> >>>> If the contents of a .pending file, through the turbulence ( restarts >>>> etc ) are assured to be in another file than anything starting with "_" >>>> underscore will by default ignored by hadoop ( hive or MR etc ). >>>> >>>> >>>> >>>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org >>>> > wrote: >>>> >>>>> Hi, >>>>> >>>>> Sorry for the confusion. The framework (Flink) does currently not do >>>>> any cleanup of pending files, yes. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> >>>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> >>>>> wrote: >>>>> >>>>> >> You should only have these dangling pending files after a >>>>> failure-recovery cycle, as you noticed. My suggestion would be to >>>>> periodically clean up older pending files. >>>>> >>>>> A little confused. Is that what the framework should do, or us as part >>>>> of some cleanup job ? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek < >>>>> aljos...@apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> The BucketingSink does not clean up pending files on purpose. In a >>>>>> distributed setting, and especially with rescaling of Flink operators, it >>>>>> is sufficiently hard to figure out which of the pending files you >>>>>> actually >>>>>> can delete and which of them you have to leave because they will get >>>>>> moved >>>>>> to "final" as part of recovering from a checkpoint on some other parallel >>>>>> instance of the sink. >>>>>> >>>>>> You should only have these dangling pending files after a >>>>>> failure-recovery cycle, as you noticed. My suggestion would be to >>>>>> periodically clean up older pending files. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> >>>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> what pending files should indeed get eventually finalized. This >>>>>> happens on a checkpoint complete notification. Thus, what you report >>>>>> seems >>>>>> not right. Maybe Aljoscha can shed a bit more light into the problem. >>>>>> >>>>>> In order to further debug the problem, it would be really helpful to >>>>>> get access to DEBUG log files of a TM which runs the BucketingSink. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> I have the same concern about save pointing in BucketingSink. >>>>>>> As for your question, I think before the pending files get cleared >>>>>>> in handleRestoredBucketState . >>>>>>> They are finalized in notifyCheckpointComplete >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628 >>>>>>> >>>>>>> I'm looking into this part of the source code now, since we are >>>>>>> experiencing some unclosed files after check pointing. >>>>>>> It would be great if you can share more if you find something new >>>>>>> about your problem, which might help with our problem. >>>>>>> >>>>>>> Best regards, >>>>>>> Mu >>>>>>> >>>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >>>>>>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>>>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >>>>>>>> >>>>>>>> >>>>>>>> This is strange, we had a few retries b'coz of an OOM on one of the >>>>>>>> TMs and we see this situation. 2 files ( on either sides ) that were >>>>>>>> dealt >>>>>>>> with fine but a dangling .pending file. I am sure this is not what is >>>>>>>> meant >>>>>>>> to be. We I think have an edge condition and looking at the code it >>>>>>>> is >>>>>>>> not obvious. May be some one who wrote the code can shed some light as >>>>>>>> to >>>>>>>> how can this happen. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> without --allowNonRestoredState, on a suspend/resume we do see >>>>>>>>> the length file along with the finalized file ( finalized during >>>>>>>>> resume ) >>>>>>>>> >>>>>>>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>>>>>>> >>>>>>>>> that does makes much more sense. >>>>>>>>> >>>>>>>>> I guess we should document --allowNonRestoredState better ? It >>>>>>>>> seems it actually drops state ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> This is 1.4 BTW. I am not sure that I am reading this correctly >>>>>>>>>> but the lifecycle of cancel/resume is 2 steps >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. Cancel job with SP >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> closeCurrentPartFile >>>>>>>>>> >>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>>>>>>> >>>>>>>>>> is called from close() >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> and that moves files to pending state. That I would presume is >>>>>>>>>> called when one does a cancel. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2. The restore on resume >>>>>>>>>> >>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>>>>>>> >>>>>>>>>> calls >>>>>>>>>> >>>>>>>>>> handleRestoredBucketState >>>>>>>>>> >>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>>>>>>> >>>>>>>>>> clears the pending files from state without finalizing them? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> That does not seem to be right. I must be reading the code >>>>>>>>>> totally wrong ? >>>>>>>>>> >>>>>>>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>>>>>>> getting the state . At least https://ci.apache.org/pr >>>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is >>>>>>>>>> not exactly clear what it does if we add an operator ( GDF I think >>>>>>>>>> will add >>>>>>>>>> a new operator in the DAG without state even if stateful, in my case >>>>>>>>>> the >>>>>>>>>> Map operator is not even stateful ) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks and please bear with me if this is all something pretty >>>>>>>>>> simple. >>>>>>>>>> >>>>>>>>>> Vishal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>>>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>>>>>>> >>>>>>>>>>> So I did this >>>>>>>>>>> >>>>>>>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>>>>>>> --allowNonRestoredState as in I had added a harmless >>>>>>>>>>> MapOperator ( stateless ). >>>>>>>>>>> >>>>>>>>>>> * I see that a file on hdfs, the file being written to ( before >>>>>>>>>>> the cancel with save point ) go into a pending state >>>>>>>>>>> _part-0-21.pending >>>>>>>>>>> >>>>>>>>>>> * I see a new file being written to in the resumed pipe >>>>>>>>>>> _part-0-22.in-progress. >>>>>>>>>>> >>>>>>>>>>> What I do not see is the file in _part-0-21.pending being >>>>>>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed >>>>>>>>>>> that >>>>>>>>>>> would be the case in this controlled suspend/resume circumstance. >>>>>>>>>>> Further >>>>>>>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Vishal >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >