Hi Vishal, what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem.
In order to further debug the problem, it would be really helpful to get access to DEBUG log files of a TM which runs the BucketingSink. Cheers, Till On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote: > Hi Vishal, > > I have the same concern about save pointing in BucketingSink. > As for your question, I think before the pending files get cleared in > handleRestoredBucketState . > They are finalized in notifyCheckpointComplete > > https://github.com/apache/flink/blob/master/flink- > connectors/flink-connector-filesystem/src/main/java/org/ > apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628 > > I'm looking into this part of the source code now, since we are > experiencing some unclosed files after check pointing. > It would be great if you can share more if you find something new about > your problem, which might help with our problem. > > Best regards, > Mu > > On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >> >> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >> >> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >> >> >> This is strange, we had a few retries b'coz of an OOM on one of the TMs >> and we see this situation. 2 files ( on either sides ) that were dealt >> with fine but a dangling .pending file. I am sure this is not what is meant >> to be. We I think have an edge condition and looking at the code it is >> not obvious. May be some one who wrote the code can shed some light as to >> how can this happen. >> >> >> >> >> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> without --allowNonRestoredState, on a suspend/resume we do see the >>> length file along with the finalized file ( finalized during resume ) >>> >>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>> >>> that does makes much more sense. >>> >>> I guess we should document --allowNonRestoredState better ? It seems it >>> actually drops state ? >>> >>> >>> >>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> This is 1.4 BTW. I am not sure that I am reading this correctly but >>>> the lifecycle of cancel/resume is 2 steps >>>> >>>> >>>> >>>> 1. Cancel job with SP >>>> >>>> >>>> closeCurrentPartFile >>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors >>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>> >>>> is called from close() >>>> >>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors >>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>> >>>> >>>> and that moves files to pending state. That I would presume is called >>>> when one does a cancel. >>>> >>>> >>>> >>>> 2. The restore on resume >>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors >>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>> >>>> calls >>>> >>>> handleRestoredBucketState >>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors >>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>> >>>> clears the pending files from state without finalizing them? >>>> >>>> >>>> >>>> That does not seem to be right. I must be reading the code totally >>>> wrong ? >>>> >>>> I am not sure also whether --allowNonRestoredState is skipping getting >>>> the state . At least https://ci.apache.org/pr >>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not >>>> exactly clear what it does if we add an operator ( GDF I think will add a >>>> new operator in the DAG without state even if stateful, in my case the Map >>>> operator is not even stateful ) >>>> >>>> >>>> Thanks and please bear with me if this is all something pretty simple. >>>> >>>> Vishal >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> What should be the behavior of BucketingSink vis a vis state ( pending >>>>> , inprogess and finalization ) when we suspend and resume ? >>>>> >>>>> So I did this >>>>> >>>>> * I had a pipe writing to hdfs suspend and resume using >>>>> >>>>> --allowNonRestoredState as in I had added a harmless MapOperator ( >>>>> stateless ). >>>>> >>>>> >>>>> * I see that a file on hdfs, the file being written to ( before the >>>>> cancel with save point ) go into a pending state _part-0-21.pending >>>>> >>>>> >>>>> * I see a new file being written to in the resumed pipe >>>>> _part-0-22.in-progress. >>>>> >>>>> >>>>> What I do not see is the file in _part-0-21.pending being finalized >>>>> ( as in renamed to a just part-0-21. I would have assumed that would be >>>>> the >>>>> case in this controlled suspend/resume circumstance. Further it is a >>>>> rename >>>>> and hdfs mv is not an expensive operation. >>>>> >>>>> >>>>> >>>>> Am I understanding the process correct and it yes any pointers ? >>>>> >>>>> >>>>> Regards, >>>>> >>>>> >>>>> Vishal >>>>> >>>> >>>> >>> >> >