Hi, Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes.
Best, Aljoscha > On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > >> You should only have these dangling pending files after a failure-recovery > >> cycle, as you noticed. My suggestion would be to periodically clean up > >> older pending files. > > A little confused. Is that what the framework should do, or us as part of > some cleanup job ? > > > > > > On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > > The BucketingSink does not clean up pending files on purpose. In a > distributed setting, and especially with rescaling of Flink operators, it is > sufficiently hard to figure out which of the pending files you actually can > delete and which of them you have to leave because they will get moved to > "final" as part of recovering from a checkpoint on some other parallel > instance of the sink. > > You should only have these dangling pending files after a failure-recovery > cycle, as you noticed. My suggestion would be to periodically clean up older > pending files. > > Best, > Aljoscha > > >> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> wrote: >> >> Hi Vishal, >> >> what pending files should indeed get eventually finalized. This happens on a >> checkpoint complete notification. Thus, what you report seems not right. >> Maybe Aljoscha can shed a bit more light into the problem. >> >> In order to further debug the problem, it would be really helpful to get >> access to DEBUG log files of a TM which runs the BucketingSink. >> >> Cheers, >> Till >> >> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com >> <mailto:kong.mu....@gmail.com>> wrote: >> Hi Vishal, >> >> I have the same concern about save pointing in BucketingSink. >> As for your question, I think before the pending files get cleared in >> handleRestoredBucketState . >> They are finalized in notifyCheckpointComplete >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628> >> >> I'm looking into this part of the source code now, since we are experiencing >> some unclosed files after check pointing. >> It would be great if you can share more if you find something new about your >> problem, which might help with our problem. >> >> Best regards, >> Mu >> >> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >> >> >> This is strange, we had a few retries b'coz of an OOM on one of the TMs and >> we see this situation. 2 files ( on either sides ) that were dealt with >> fine but a dangling .pending file. I am sure this is not what is meant to >> be. We I think have an edge condition and looking at the code it is not >> obvious. May be some one who wrote the code can shed some light as to how >> can this happen. >> >> >> >> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> without --allowNonRestoredState, on a suspend/resume we do see the length >> file along with the finalized file ( finalized during resume ) >> >> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >> >> that does makes much more sense. >> >> I guess we should document --allowNonRestoredState better ? It seems it >> actually drops state ? >> >> >> >> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> This is 1.4 BTW. I am not sure that I am reading this correctly but the >> lifecycle of cancel/resume is 2 steps >> >> >> >> 1. Cancel job with SP >> >> >> closeCurrentPartFile >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549> >> >> is called from close() >> >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416> >> >> >> and that moves files to pending state. That I would presume is called when >> one does a cancel. >> >> >> >> 2. The restore on resume >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369> >> >> calls >> >> handleRestoredBucketState >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704 >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704> >> >> clears the pending files from state without finalizing them? >> >> >> >> That does not seem to be right. I must be reading the code totally wrong ? >> >> I am not sure also whether --allowNonRestoredState is skipping getting the >> state . At least >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints> >> is not exactly clear what it does if we add an operator ( GDF I think will >> add a new operator in the DAG without state even if stateful, in my case the >> Map operator is not even stateful ) >> >> >> Thanks and please bear with me if this is all something pretty simple. >> >> Vishal >> >> >> >> >> >> >> >> >> >> >> >> >> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> What should be the behavior of BucketingSink vis a vis state ( pending , >> inprogess and finalization ) when we suspend and resume ? >> >> So I did this >> >> * I had a pipe writing to hdfs suspend and resume using >> --allowNonRestoredState as in I had added a harmless MapOperator ( stateless >> ). >> >> * I see that a file on hdfs, the file being written to ( before the cancel >> with save point ) go into a pending state _part-0-21.pending >> >> * I see a new file being written to in the resumed pipe >> _part-0-22.in-progress. >> >> What I do not see is the file in _part-0-21.pending being finalized ( as >> in renamed to a just part-0-21. I would have assumed that would be the case >> in this controlled suspend/resume circumstance. Further it is a rename and >> hdfs mv is not an expensive operation. >> >> >> Am I understanding the process correct and it yes any pointers ? >> >> Regards, >> >> Vishal >> >> >> >> >> > >