Hi Aljoscha, Thanks for confirming that fact that Flink doesn't clean up pending files. Is that safe to clean(remove) all the pending files after cancel(w/ or w/o savepointing) or failover? If we do that, will we lose some data?
Thanks! Best, Mu On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Sorry, but just wanted to confirm that the assertion "at-least-once" > delivery true if there is a dangling pending file ? > > On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> That is fine, till flink assure at-least-once semantics ? >> >> If the contents of a .pending file, through the turbulence ( restarts etc >> ) are assured to be in another file than anything starting with "_" >> underscore will by default ignored by hadoop ( hive or MR etc ). >> >> >> >> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> Sorry for the confusion. The framework (Flink) does currently not do any >>> cleanup of pending files, yes. >>> >>> Best, >>> Aljoscha >>> >>> >>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> >>> >> You should only have these dangling pending files after a >>> failure-recovery cycle, as you noticed. My suggestion would be to >>> periodically clean up older pending files. >>> >>> A little confused. Is that what the framework should do, or us as part >>> of some cleanup job ? >>> >>> >>> >>> >>> >>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> The BucketingSink does not clean up pending files on purpose. In a >>>> distributed setting, and especially with rescaling of Flink operators, it >>>> is sufficiently hard to figure out which of the pending files you actually >>>> can delete and which of them you have to leave because they will get moved >>>> to "final" as part of recovering from a checkpoint on some other parallel >>>> instance of the sink. >>>> >>>> You should only have these dangling pending files after a >>>> failure-recovery cycle, as you noticed. My suggestion would be to >>>> periodically clean up older pending files. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote: >>>> >>>> Hi Vishal, >>>> >>>> what pending files should indeed get eventually finalized. This happens >>>> on a checkpoint complete notification. Thus, what you report seems not >>>> right. Maybe Aljoscha can shed a bit more light into the problem. >>>> >>>> In order to further debug the problem, it would be really helpful to >>>> get access to DEBUG log files of a TM which runs the BucketingSink. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote: >>>> >>>>> Hi Vishal, >>>>> >>>>> I have the same concern about save pointing in BucketingSink. >>>>> As for your question, I think before the pending files get cleared in >>>>> handleRestoredBucketState . >>>>> They are finalized in notifyCheckpointComplete >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628 >>>>> >>>>> I'm looking into this part of the source code now, since we are >>>>> experiencing some unclosed files after check pointing. >>>>> It would be great if you can share more if you find something new >>>>> about your problem, which might help with our problem. >>>>> >>>>> Best regards, >>>>> Mu >>>>> >>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >>>>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >>>>>> >>>>>> >>>>>> This is strange, we had a few retries b'coz of an OOM on one of the >>>>>> TMs and we see this situation. 2 files ( on either sides ) that were >>>>>> dealt >>>>>> with fine but a dangling .pending file. I am sure this is not what is >>>>>> meant >>>>>> to be. We I think have an edge condition and looking at the code it is >>>>>> not obvious. May be some one who wrote the code can shed some light as to >>>>>> how can this happen. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> without --allowNonRestoredState, on a suspend/resume we do see the >>>>>>> length file along with the finalized file ( finalized during resume ) >>>>>>> >>>>>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>>>>> >>>>>>> that does makes much more sense. >>>>>>> >>>>>>> I guess we should document --allowNonRestoredState better ? It >>>>>>> seems it actually drops state ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> This is 1.4 BTW. I am not sure that I am reading this correctly >>>>>>>> but the lifecycle of cancel/resume is 2 steps >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 1. Cancel job with SP >>>>>>>> >>>>>>>> >>>>>>>> closeCurrentPartFile >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>>>>> >>>>>>>> is called from close() >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>>>>> >>>>>>>> >>>>>>>> and that moves files to pending state. That I would presume is >>>>>>>> called when one does a cancel. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2. The restore on resume >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>>>>> >>>>>>>> calls >>>>>>>> >>>>>>>> handleRestoredBucketState >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>>>>> >>>>>>>> clears the pending files from state without finalizing them? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> That does not seem to be right. I must be reading the code totally >>>>>>>> wrong ? >>>>>>>> >>>>>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>>>>> getting the state . At least https://ci.apache.org/pr >>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not >>>>>>>> exactly clear what it does if we add an operator ( GDF I think will >>>>>>>> add a >>>>>>>> new operator in the DAG without state even if stateful, in my case the >>>>>>>> Map >>>>>>>> operator is not even stateful ) >>>>>>>> >>>>>>>> >>>>>>>> Thanks and please bear with me if this is all something pretty >>>>>>>> simple. >>>>>>>> >>>>>>>> Vishal >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>>>>> >>>>>>>>> So I did this >>>>>>>>> >>>>>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>>>>> --allowNonRestoredState as in I had added a harmless MapOperator >>>>>>>>> ( stateless ). >>>>>>>>> >>>>>>>>> * I see that a file on hdfs, the file being written to ( before >>>>>>>>> the cancel with save point ) go into a pending state >>>>>>>>> _part-0-21.pending >>>>>>>>> >>>>>>>>> * I see a new file being written to in the resumed pipe >>>>>>>>> _part-0-22.in-progress. >>>>>>>>> >>>>>>>>> What I do not see is the file in _part-0-21.pending being >>>>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed >>>>>>>>> that >>>>>>>>> would be the case in this controlled suspend/resume circumstance. >>>>>>>>> Further >>>>>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>>>>> >>>>>>>>> >>>>>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Vishal >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >>> >> >