That is fine, till flink assure at-least-once semantics ? If the contents of a .pending file, through the turbulence ( restarts etc ) are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ).
On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > Sorry for the confusion. The framework (Flink) does currently not do any > cleanup of pending files, yes. > > Best, > Aljoscha > > > On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > >> You should only have these dangling pending files after a > failure-recovery cycle, as you noticed. My suggestion would be to > periodically clean up older pending files. > > A little confused. Is that what the framework should do, or us as part of > some cleanup job ? > > > > > > On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> The BucketingSink does not clean up pending files on purpose. In a >> distributed setting, and especially with rescaling of Flink operators, it >> is sufficiently hard to figure out which of the pending files you actually >> can delete and which of them you have to leave because they will get moved >> to "final" as part of recovering from a checkpoint on some other parallel >> instance of the sink. >> >> You should only have these dangling pending files after a >> failure-recovery cycle, as you noticed. My suggestion would be to >> periodically clean up older pending files. >> >> Best, >> Aljoscha >> >> >> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Vishal, >> >> what pending files should indeed get eventually finalized. This happens >> on a checkpoint complete notification. Thus, what you report seems not >> right. Maybe Aljoscha can shed a bit more light into the problem. >> >> In order to further debug the problem, it would be really helpful to get >> access to DEBUG log files of a TM which runs the BucketingSink. >> >> Cheers, >> Till >> >> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote: >> >>> Hi Vishal, >>> >>> I have the same concern about save pointing in BucketingSink. >>> As for your question, I think before the pending files get cleared in >>> handleRestoredBucketState . >>> They are finalized in notifyCheckpointComplete >>> >>> https://github.com/apache/flink/blob/master/flink-connectors >>> /flink-connector-filesystem/src/main/java/org/apache/flink/ >>> streaming/connectors/fs/bucketing/BucketingSink.java#L628 >>> >>> I'm looking into this part of the source code now, since we are >>> experiencing some unclosed files after check pointing. >>> It would be great if you can share more if you find something new about >>> your problem, which might help with our problem. >>> >>> Best regards, >>> Mu >>> >>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >>>> >>>> >>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs >>>> and we see this situation. 2 files ( on either sides ) that were dealt >>>> with fine but a dangling .pending file. I am sure this is not what is meant >>>> to be. We I think have an edge condition and looking at the code it is >>>> not obvious. May be some one who wrote the code can shed some light as to >>>> how can this happen. >>>> >>>> >>>> >>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> without --allowNonRestoredState, on a suspend/resume we do see the >>>>> length file along with the finalized file ( finalized during resume ) >>>>> >>>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>>> >>>>> that does makes much more sense. >>>>> >>>>> I guess we should document --allowNonRestoredState better ? It seems >>>>> it actually drops state ? >>>>> >>>>> >>>>> >>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> This is 1.4 BTW. I am not sure that I am reading this correctly but >>>>>> the lifecycle of cancel/resume is 2 steps >>>>>> >>>>>> >>>>>> >>>>>> 1. Cancel job with SP >>>>>> >>>>>> >>>>>> closeCurrentPartFile >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>>> >>>>>> is called from close() >>>>>> >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>>> >>>>>> >>>>>> and that moves files to pending state. That I would presume is >>>>>> called when one does a cancel. >>>>>> >>>>>> >>>>>> >>>>>> 2. The restore on resume >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>>> >>>>>> calls >>>>>> >>>>>> handleRestoredBucketState >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>>> >>>>>> clears the pending files from state without finalizing them? >>>>>> >>>>>> >>>>>> >>>>>> That does not seem to be right. I must be reading the code totally >>>>>> wrong ? >>>>>> >>>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>>> getting the state . At least https://ci.apache.org/pr >>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not >>>>>> exactly clear what it does if we add an operator ( GDF I think will add a >>>>>> new operator in the DAG without state even if stateful, in my case the >>>>>> Map >>>>>> operator is not even stateful ) >>>>>> >>>>>> >>>>>> Thanks and please bear with me if this is all something pretty simple. >>>>>> >>>>>> Vishal >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>>> >>>>>>> So I did this >>>>>>> >>>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>>> --allowNonRestoredState as in I had added a harmless MapOperator ( >>>>>>> stateless ). >>>>>>> >>>>>>> * I see that a file on hdfs, the file being written to ( before the >>>>>>> cancel with save point ) go into a pending state >>>>>>> _part-0-21.pending >>>>>>> >>>>>>> * I see a new file being written to in the resumed pipe >>>>>>> _part-0-22.in-progress. >>>>>>> >>>>>>> What I do not see is the file in _part-0-21.pending being >>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed that >>>>>>> would be the case in this controlled suspend/resume circumstance. >>>>>>> Further >>>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>>> >>>>>>> >>>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Vishal >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> > >