I would expect that to be possible as well, yes.
> On 21. Apr 2018, at 17:33, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
>
> >> After the savepoint state has been written, the sink might start new
> >> .in-progress files. These files are not part of the savepoint but renamed
> >> to .pending in close().
> >> On restore all pending files that are part of the savepoint are moved into
> >> final state (and possibly truncated). See handlePendingInProgressFiles()
> >> method.
> >> Pending files that are not part of the savepoint (because they were
> >> created later between taking the savepoint and shutting the job down) are
> >> not touched and remain >> as .pending files.
>
> >> These should be the .pending files that you observe. Since they contain
> >> data that is not part of the savepoint, it should be save to delete them.
> >> If you keep them, you will have at-least-once output.
>
>
> Is the above also possible with in-progress file ? I had a situation where
> we see such a dangling file through a restart on error.
>
>
> On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <vishal.santo...@gmail.com
> <mailto:vishal.santo...@gmail.com>> wrote:
> Thank you Fabian,
>
> What is more important ( and I think you might have addressed it in your
> post so sorry for being a little obtuse ) is that deleting them does not
> violate "at-least-once" delivery. And if that is a definite than we are fine
> with it, though we will test it further.
>
> Thanks and Regards.
>
>
>
> On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com
> <mailto:fhue...@gmail.com>> wrote:
> Hi Vishal, hi Mu,
>
> After the savepoint state has been written, the sink might start new
> .in-progress files. These files are not part of the savepoint but renamed to
> .pending in close().
> On restore all pending files that are part of the savepoint are moved into
> final state (and possibly truncated). See handlePendingInProgressFiles()
> method.
> Pending files that are not part of the savepoint (because they were created
> later between taking the savepoint and shutting the job down) are not touched
> and remain as .pending files.
>
> These should be the .pending files that you observe. Since they contain data
> that is not part of the savepoint, it should be save to delete them.
> If you keep them, you will have at-least-once output.
>
> Best, Fabian
>
>
> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com
> <mailto:kong.mu....@gmail.com>>:
> Hi Aljoscha,
>
> Thanks for confirming that fact that Flink doesn't clean up pending files.
> Is that safe to clean(remove) all the pending files after cancel(w/ or w/o
> savepointing) or failover?
> If we do that, will we lose some data?
>
> Thanks!
>
> Best,
> Mu
>
>
>
>
> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <vishal.santo...@gmail.com
> <mailto:vishal.santo...@gmail.com>> wrote:
> Sorry, but just wanted to confirm that the assertion "at-least-once"
> delivery true if there is a dangling pending file ?
>
> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <vishal.santo...@gmail.com
> <mailto:vishal.santo...@gmail.com>> wrote:
> That is fine, till flink assure at-least-once semantics ?
>
> If the contents of a .pending file, through the turbulence ( restarts etc )
> are assured to be in another file than anything starting with "_" underscore
> will by default ignored by hadoop ( hive or MR etc ).
>
>
>
> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
> Hi,
>
> Sorry for the confusion. The framework (Flink) does currently not do any
> cleanup of pending files, yes.
>
> Best,
> Aljoscha
>
>
>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com
>> <mailto:vishal.santo...@gmail.com>> wrote:
>>
>> >> You should only have these dangling pending files after a
>> >> failure-recovery cycle, as you noticed. My suggestion would be to
>> >> periodically clean up older pending files.
>>
>> A little confused. Is that what the framework should do, or us as part of
>> some cleanup job ?
>>
>>
>>
>>
>>
>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>>
>> The BucketingSink does not clean up pending files on purpose. In a
>> distributed setting, and especially with rescaling of Flink operators, it is
>> sufficiently hard to figure out which of the pending files you actually can
>> delete and which of them you have to leave because they will get moved to
>> "final" as part of recovering from a checkpoint on some other parallel
>> instance of the sink.
>>
>> You should only have these dangling pending files after a failure-recovery
>> cycle, as you noticed. My suggestion would be to periodically clean up older
>> pending files.
>>
>> Best,
>> Aljoscha
>>
>>
>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org
>>> <mailto:trohrm...@apache.org>> wrote:
>>>
>>> Hi Vishal,
>>>
>>> what pending files should indeed get eventually finalized. This happens on
>>> a checkpoint complete notification. Thus, what you report seems not right.
>>> Maybe Aljoscha can shed a bit more light into the problem.
>>>
>>> In order to further debug the problem, it would be really helpful to get
>>> access to DEBUG log files of a TM which runs the BucketingSink.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com
>>> <mailto:kong.mu....@gmail.com>> wrote:
>>> Hi Vishal,
>>>
>>> I have the same concern about save pointing in BucketingSink.
>>> As for your question, I think before the pending files get cleared in
>>> handleRestoredBucketState .
>>> They are finalized in notifyCheckpointComplete
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628>
>>>
>>> I'm looking into this part of the source code now, since we are
>>> experiencing some unclosed files after check pointing.
>>> It would be great if you can share more if you find something new about
>>> your problem, which might help with our problem.
>>>
>>> Best regards,
>>> Mu
>>>
>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi
>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>
>>>
>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs and
>>> we see this situation. 2 files ( on either sides ) that were dealt with
>>> fine but a dangling .pending file. I am sure this is not what is meant to
>>> be. We I think have an edge condition and looking at the code it is not
>>> obvious. May be some one who wrote the code can shed some light as to how
>>> can this happen.
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <vishal.santo...@gmail.com
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> without --allowNonRestoredState, on a suspend/resume we do see the length
>>> file along with the finalized file ( finalized during resume )
>>>
>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57
>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>
>>> that does makes much more sense.
>>>
>>> I guess we should document --allowNonRestoredState better ? It seems it
>>> actually drops state ?
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <vishal.santo...@gmail.com
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> This is 1.4 BTW. I am not sure that I am reading this correctly but the
>>> lifecycle of cancel/resume is 2 steps
>>>
>>>
>>>
>>> 1. Cancel job with SP
>>>
>>>
>>> closeCurrentPartFile
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549>
>>>
>>> is called from close()
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416>
>>>
>>>
>>> and that moves files to pending state. That I would presume is called when
>>> one does a cancel.
>>>
>>>
>>>
>>> 2. The restore on resume
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369>
>>>
>>> calls
>>>
>>> handleRestoredBucketState
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L704>
>>>
>>> clears the pending files from state without finalizing them?
>>>
>>>
>>>
>>> That does not seem to be right. I must be reading the code totally wrong ?
>>>
>>> I am not sure also whether --allowNonRestoredState is skipping getting the
>>> state . At least
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
>>>
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints>
>>> is not exactly clear what it does if we add an operator ( GDF I think will
>>> add a new operator in the DAG without state even if stateful, in my case
>>> the Map operator is not even stateful )
>>>
>>>
>>> Thanks and please bear with me if this is all something pretty simple.
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <vishal.santo...@gmail.com
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> What should be the behavior of BucketingSink vis a vis state ( pending ,
>>> inprogess and finalization ) when we suspend and resume ?
>>>
>>> So I did this
>>>
>>> * I had a pipe writing to hdfs suspend and resume using
>>> --allowNonRestoredState as in I had added a harmless MapOperator (
>>> stateless ).
>>>
>>> * I see that a file on hdfs, the file being written to ( before the cancel
>>> with save point ) go into a pending state _part-0-21.pending
>>>
>>> * I see a new file being written to in the resumed pipe
>>> _part-0-22.in-progress.
>>>
>>> What I do not see is the file in _part-0-21.pending being finalized ( as
>>> in renamed to a just part-0-21. I would have assumed that would be the case
>>> in this controlled suspend/resume circumstance. Further it is a rename and
>>> hdfs mv is not an expensive operation.
>>>
>>>
>>> Am I understanding the process correct and it yes any pointers ?
>>>
>>> Regards,
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
>
>
>
>
>