As in, if there are chk handles in zk, there should no reason to start a
new job ( bad handle, no hdfs connectivity etc ),
 yes that sums it up.

On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Wait a sec, I just checked out the code again and it seems we already do
> that: https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bdd
> c0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/
> runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>
> If there were some checkpoints but none could be read we fail recovery.
>
>
> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> That sounds reasonable: We would keep the first fix, i.e. never delete
> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
> we fail the job if there are some checkpoints in ZooKeeper but none of them
> can be restored to prevent the case where a job starts from scratch even
> though it shouldn't.
>
> Does that sum it up?
>
> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> If we hit the retry limit, abort the job. In our case we will restart from
> the last SP ( we as any production pile do it is n time s a day )  and that
> I would think should be OK for most folks ?
>
> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thank you for considering this. If I understand you correctly.
>>
>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>> * Some issue restarted the pipeline.
>> * The NN was down unfortunately and flink could not retrieve the  CHK
>> state from the CHK pointer on ZK.
>>
>> Before
>>
>> * The CHK pointer was being removed and the job started from a brand new
>> slate.
>>
>> After ( this fix on 1.4 +)
>>
>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ).
>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit
>> any retry limit ) to restore state
>> * NN comes back
>> * Flink restores state on the next retry.
>>
>> I would hope that is the sequence to follow.
>>
>> Regards.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> I think you might be right. We fixed the problem that checkpoints where
>>> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However,
>>> we still have the problem that if the DFS is not up at all then it will
>>> look as if the job is starting from scratch. However, the alternative is
>>> failing the job, in which case you will also never be able to restore from
>>> a checkpoint. What do you think?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Sorry for the late reply.
>>>
>>> I created FLINK-8487 [1] to track this problem
>>>
>>> @Vishal, can you have a look and check if if forgot some details? I
>>> logged the issue for Flink 1.3.2, is that correct?
>>> Please add more information if you think it is relevant.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>>
>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> Or this one
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-4815
>>>>
>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> ping.
>>>>>
>>>>>     This happened again on production and it seems reasonable to abort
>>>>> when a checkpoint is not found rather than behave as if it is a brand new
>>>>> pipeline.
>>>>>
>>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Folks sorry for being late on this. Can some body with the knowledge
>>>>>> of this code base create a jira issue for the above ? We have seen this
>>>>>> more than once on production.
>>>>>>
>>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> Some relevant Jira issues for you are:
>>>>>>>
>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>>>>> failed checkpoints
>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>>>> priority.
>>>>>>> Please add all relevant information of your detailed analysis, add a
>>>>>>> link to this email thread (see [1] for the web archive of the mailing
>>>>>>> list), and post the id of the JIRA issue here.
>>>>>>>
>>>>>>> Thanks for looking into this!
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Fabian
>>>>>>>
>>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>>>
>>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>
>>>>>>>> Thank you for confirming.
>>>>>>>>
>>>>>>>>
>>>>>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>>>>> becomes all the more painful with your confirming that  "failed
>>>>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>>>>> store in unavailable  during checkpoint than you have lost state ( 
>>>>>>>> till of
>>>>>>>> course you have a retry of none or an unbounded retry delay, a delay 
>>>>>>>> that
>>>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>>>> failure  will cause new state according the code as written iff the 
>>>>>>>> remote
>>>>>>>> store is down. We would rather have a configurable property that
>>>>>>>> establishes  our desire to abort something like a
>>>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>>>
>>>>>>>>
>>>>>>>> In our case it is very important that we do not undercount a
>>>>>>>> window, one reason we use flink and it's awesome failure guarantees, as
>>>>>>>> various alarms sound ( we do anomaly detection on the time series ).
>>>>>>>>
>>>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>>>
>>>>>>>>
>>>>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>>>>> important too.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <
>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>>>>
>>>>>>>>> With your last mail your basically saying, that the checkpoint
>>>>>>>>> could not be restored because your HDFS was temporarily down. If 
>>>>>>>>> Flink had
>>>>>>>>> not deleted that checkpoint it might have been possible to restore it 
>>>>>>>>> at a
>>>>>>>>> later point, right?
>>>>>>>>>
>>>>>>>>> Regarding failed checkpoints killing the job: yes, this is
>>>>>>>>> currently the expected behaviour but there are plans to change this.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> I think this is the offending piece. There is a catch all
>>>>>>>>> Exception, which IMHO should understand a recoverable exception from 
>>>>>>>>> an
>>>>>>>>> unrecoverable on.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> try {
>>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>>>> eckpointStateHandle);
>>>>>>>>> if (completedCheckpoint != null) {
>>>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>>>> }
>>>>>>>>> } catch (Exception e) {
>>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>>>> completed " +
>>>>>>>>> "checkpoint store.", e);
>>>>>>>>> // remove the checkpoint with broken state handle
>>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>>>> checkpointStateHandle.f0);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some
>>>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same 
>>>>>>>>>> exact
>>>>>>>>>> last successful checkpoint that was successful before NN screwed us 
>>>>>>>>>> ). When
>>>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>>>>>>> retrieve
>>>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly 
>>>>>>>>>> )
>>>>>>>>>> removed the CHK from being considered and cleaned the pointer ( 
>>>>>>>>>> though
>>>>>>>>>> failed as was NN was down and is obvious from the dangling file in 
>>>>>>>>>> recovery
>>>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should 
>>>>>>>>>> have
>>>>>>>>>> been a stop all, not going to trying doing magic exception rather 
>>>>>>>>>> than
>>>>>>>>>> starting from a blank state.
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve
>>>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286.
>>>>>>>>>> This indicates that the retrieved state handle is broken. Try 
>>>>>>>>>> cleaning the
>>>>>>>>>> state handle store.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same
>>>>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers 
>>>>>>>>>>> to the
>>>>>>>>>>> chk point  ( I  think that is what it does, keeps metadata of where 
>>>>>>>>>>> the
>>>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from 
>>>>>>>>>>> the
>>>>>>>>>>> failed state.
>>>>>>>>>>>
>>>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>>>
>>>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>>>
>>>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>>>
>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>>> -44286
>>>>>>>>>>>>
>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>>> -45428
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk
>>>>>>>>>>>> point directory with a new one. I see it happening now. Every 
>>>>>>>>>>>> minute it
>>>>>>>>>>>> replaces the old directory.  In this job's case however, it did 
>>>>>>>>>>>> not delete
>>>>>>>>>>>> the 2017-10-04 13:54  and hence the chk-44286 directory.  This was 
>>>>>>>>>>>> the last
>>>>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had issues 
>>>>>>>>>>>> but as
>>>>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it started 
>>>>>>>>>>>> with a
>>>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has 
>>>>>>>>>>>>> some natural
>>>>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka 
>>>>>>>>>>>>> as a
>>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub 
>>>>>>>>>>>>> is organic
>>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with 
>>>>>>>>>>>>> windows on
>>>>>>>>>>>>> event time etc )
>>>>>>>>>>>>>
>>>>>>>>>>>>>                    Coming back to this issue. We have that
>>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we do 
>>>>>>>>>>>>> not see any
>>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not 
>>>>>>>>>>>>> applicable. I am
>>>>>>>>>>>>> totally certain that the "retention" time was not an issue.
>>>>>>>>>>>>> It is 4 days of retention and we fixed this issue within 30 
>>>>>>>>>>>>> minutes. We
>>>>>>>>>>>>> could replay kafka with a new consumer group.id and that
>>>>>>>>>>>>> worked fine.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>>>>
>>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
>>>>>>>>>>>>> consumers is the default true. I bring this up to see whether 
>>>>>>>>>>>>> flink will in
>>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset 
>>>>>>>>>>>>> rather
>>>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>>>
>>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.
>>>>>>>>>>>>> The state is big enough though therefore IMHO no way the state is 
>>>>>>>>>>>>> stored
>>>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring 
>>>>>>>>>>>>> this up is
>>>>>>>>>>>>> to make sure when you say that the size has to be less than 
>>>>>>>>>>>>> 1024bytes , you
>>>>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>>>>
>>>>>>>>>>>>> * We have a good sense of SP ( save point )  and CP (
>>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not
>>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts to 
>>>>>>>>>>>>> restart
>>>>>>>>>>>>> the pipe before it finally succeeded.
>>>>>>>>>>>>>
>>>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= 
>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Do these make sense ? Is there anything else I should look
>>>>>>>>>>>>> at.  Please also note that it is the second time this has 
>>>>>>>>>>>>> happened. The
>>>>>>>>>>>>> first time I was vacationing and was not privy to the state of 
>>>>>>>>>>>>> the flink
>>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the 
>>>>>>>>>>>>> first window
>>>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Vishal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <
>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> window operators are always stateful because the operator
>>>>>>>>>>>>>> needs to remember previously received events (WindowFunction) or
>>>>>>>>>>>>>> intermediate results (ReduceFunction).
>>>>>>>>>>>>>> Given the program you described, a checkpoint should include
>>>>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. 
>>>>>>>>>>>>>> If the
>>>>>>>>>>>>>> program eventually successfully (i.e., without an error) 
>>>>>>>>>>>>>> recovered from the
>>>>>>>>>>>>>> last checkpoint, all its state should have been restored. Since 
>>>>>>>>>>>>>> the last
>>>>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program 
>>>>>>>>>>>>>> would have been
>>>>>>>>>>>>>> reset to that point. If the Kafka retention time is less than 
>>>>>>>>>>>>>> the time it
>>>>>>>>>>>>>> took to fix HDFS you would have lost data because it would have 
>>>>>>>>>>>>>> been
>>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to 
>>>>>>>>>>>>>> investigate this
>>>>>>>>>>>>>> further because a checkpoint recovery must not result in state 
>>>>>>>>>>>>>> loss.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed 
>>>>>>>>>>>>>> savepoint,
>>>>>>>>>>>>>> you can restart the job from that point. The main difference is 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>>>>> discarded once
>>>>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause
>>>>>>>>>>>>>> the job to fail and recover I'm not sure what the current status 
>>>>>>>>>>>>>> is.
>>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is
>>>>>>>>>>>>>>>> not supported in state standby. Visit
>>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.
>>>>>>>>>>>>>>>> java:111)
>>>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and
>>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would not have worried about the restart, but it was
>>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka 
>>>>>>>>>>>>>>>> consumer that
>>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next 
>>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had 
>>>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a 
>>>>>>>>>>>>>>>> keyed operator
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned
>>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any
>>>>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as 
>>>>>>>>>>>>>>>> expected ?
>>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the
>>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have
>>>>>>>>>>>>>>>> timeWindow(Time.of(window_size,
>>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new 
>>>>>>>>>>>>>>>> WindowFunction()), the
>>>>>>>>>>>>>>>> state is managed by flink ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>
>

Reply via email to