Yes. We have not hit the snag in 1.4.0 ( our current version ).  Again
though this occurs  under sustained down time on hadoop and it has been
more stable lately :)

On Wed, Mar 7, 2018 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote:

> The assumption in your previous mail is correct.
>
> Just to double check:
>
>   - The initially affected version you were running was 1.3.2, correct?
>
> The issue should be fixed in all active branches (1.4, 1.5, 1.6) and
> additional in 1.3.
>
> Currently released versions with this fix: 1.4.0, 1.4.1
> 1.5.0 is in the makings.
>
> We are looking to create a dedicated 1.3.3 for this fix.
>
>
> On Thu, Jan 25, 2018 at 5:13 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> To add to this, we are assuming that the default configuration will fail
>> a pipeline if  a checkpoint fails and will hit the recover loop only and
>> only if the retry limit is not reached
>>
>>
>>
>>
>> On Thu, Jan 25, 2018 at 7:00 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Sorry.
>>>
>>> There are 2 scenerios
>>>
>>>   * Idempotent Sinks Use Case where we would want to restore from the
>>> latest valid checkpoint.  If I understand the code correctly we try to
>>> retrieve all completed checkpoints  for all handles in ZK and abort ( throw
>>> an exception ) if there are handles but no corresponding complete
>>> checkpoints in hdfs,  else we use the latest valid checkpoint state.  On
>>> abort a restart  and thus restore of the  pipe  is issued repeating the
>>> above execution. If the failure in hdfs was transient a retry will succeed
>>> else when the  retry limit is reached the pipeline is aborted for good.
>>>
>>>
>>> * Non Idempotent Sinks where we have no retries. We do not want to
>>> recover from the last available checkpoint as the above code will do as the
>>> more  into history we go the more duplicates will be delivered. The only
>>> solution is use exactly once semantics of the source and sinks if possible.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Did you see my second mail?
>>>>
>>>>
>>>> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com>
>>>> wrote:
>>>>
>>>> As in, if there are chk handles in zk, there should no reason to start
>>>> a new job ( bad handle, no hdfs connectivity etc ),
>>>>  yes that sums it up.
>>>>
>>>> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Wait a sec, I just checked out the code again and it seems we already
>>>>> do that: https://github.com/apache/flink/blob/9071e3befb8c279f7
>>>>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apac
>>>>> he/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStor
>>>>> e.java#L210
>>>>>
>>>>> If there were some checkpoints but none could be read we fail recovery.
>>>>>
>>>>>
>>>>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>
>>>>> That sounds reasonable: We would keep the first fix, i.e. never delete
>>>>> checkpoints if they're "corrupt", only when they're subsumed. 
>>>>> Additionally,
>>>>> we fail the job if there are some checkpoints in ZooKeeper but none of 
>>>>> them
>>>>> can be restored to prevent the case where a job starts from scratch even
>>>>> though it shouldn't.
>>>>>
>>>>> Does that sum it up?
>>>>>
>>>>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> If we hit the retry limit, abort the job. In our case we will restart
>>>>> from the last SP ( we as any production pile do it is n time s a day )  
>>>>> and
>>>>> that I would think should be OK for most folks ?
>>>>>
>>>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Thank you for considering this. If I understand you correctly.
>>>>>>
>>>>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>>>>> * Some issue restarted the pipeline.
>>>>>> * The NN was down unfortunately and flink could not retrieve the  CHK
>>>>>> state from the CHK pointer on ZK.
>>>>>>
>>>>>> Before
>>>>>>
>>>>>> * The CHK pointer was being removed and the job started from a brand
>>>>>> new slate.
>>>>>>
>>>>>> After ( this fix on 1.4 +)
>>>>>>
>>>>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted
>>>>>> ).
>>>>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we
>>>>>> hit any retry limit ) to restore state
>>>>>> * NN comes back
>>>>>> * Flink restores state on the next retry.
>>>>>>
>>>>>> I would hope that is the sequence to follow.
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> I think you might be right. We fixed the problem that checkpoints
>>>>>>> where dropped via https://issues.apache.org/jira/browse/FLINK-7783.
>>>>>>> However, we still have the problem that if the DFS is not up at all 
>>>>>>> then it
>>>>>>> will look as if the job is starting from scratch. However, the 
>>>>>>> alternative
>>>>>>> is failing the job, in which case you will also never be able to restore
>>>>>>> from a checkpoint. What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>>
>>>>>>> Sorry for the late reply.
>>>>>>>
>>>>>>> I created FLINK-8487 [1] to track this problem
>>>>>>>
>>>>>>> @Vishal, can you have a look and check if if forgot some details? I
>>>>>>> logged the issue for Flink 1.3.2, is that correct?
>>>>>>> Please add more information if you think it is relevant.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Fabian
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>>>>>>
>>>>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>
>>>>>>>> Or this one
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-4815
>>>>>>>>
>>>>>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> ping.
>>>>>>>>>
>>>>>>>>>     This happened again on production and it seems reasonable to
>>>>>>>>> abort when a checkpoint is not found rather than behave as if it is a 
>>>>>>>>> brand
>>>>>>>>> new pipeline.
>>>>>>>>>
>>>>>>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Folks sorry for being late on this. Can some body with the
>>>>>>>>>> knowledge of this code base create a jira issue for the above ? We 
>>>>>>>>>> have
>>>>>>>>>> seen this more than once on production.
>>>>>>>>>>
>>>>>>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <
>>>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> Some relevant Jira issues for you are:
>>>>>>>>>>>
>>>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow
>>>>>>>>>>> skipping failed checkpoints
>>>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>>>>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't
>>>>>>>>>>> always remove checkpoints in ZooKeeperCompletedCheckpointSt
>>>>>>>>>>> ore#recover()
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>>>>>>>> priority.
>>>>>>>>>>> Please add all relevant information of your detailed analysis,
>>>>>>>>>>> add a link to this email thread (see [1] for the web archive of the 
>>>>>>>>>>> mailing
>>>>>>>>>>> list), and post the id of the JIRA issue here.
>>>>>>>>>>>
>>>>>>>>>>> Thanks for looking into this!
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Fabian
>>>>>>>>>>>
>>>>>>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>>>>>>>
>>>>>>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you for confirming.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  I think this is a critical bug. In essence any checkpoint
>>>>>>>>>>>> store ( hdfs/S3/File)  will loose state if it is unavailable at 
>>>>>>>>>>>> resume.
>>>>>>>>>>>> This becomes all the more painful with your confirming that  
>>>>>>>>>>>> "failed
>>>>>>>>>>>> checkpoints killing the job"  b'coz essentially it mean that if 
>>>>>>>>>>>> remote
>>>>>>>>>>>> store in unavailable  during checkpoint than you have lost state ( 
>>>>>>>>>>>> till of
>>>>>>>>>>>> course you have a retry of none or an unbounded retry delay, a 
>>>>>>>>>>>> delay that
>>>>>>>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>>>>>>>> failure  will cause new state according the code as written iff 
>>>>>>>>>>>> the remote
>>>>>>>>>>>> store is down. We would rather have a configurable property that
>>>>>>>>>>>> establishes  our desire to abort something like a
>>>>>>>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> In our case it is very important that we do not undercount a
>>>>>>>>>>>> window, one reason we use flink and it's awesome failure 
>>>>>>>>>>>> guarantees, as
>>>>>>>>>>>> various alarms sound ( we do anomaly detection on the time series 
>>>>>>>>>>>> ).
>>>>>>>>>>>>
>>>>>>>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> PS Not aborting on checkpointing, till a configurable limit is
>>>>>>>>>>>> very important too.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <
>>>>>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think you're right! And thanks for looking into this so
>>>>>>>>>>>>> deeply.
>>>>>>>>>>>>>
>>>>>>>>>>>>> With your last mail your basically saying, that the checkpoint
>>>>>>>>>>>>> could not be restored because your HDFS was temporarily down. If 
>>>>>>>>>>>>> Flink had
>>>>>>>>>>>>> not deleted that checkpoint it might have been possible to 
>>>>>>>>>>>>> restore it at a
>>>>>>>>>>>>> later point, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding failed checkpoints killing the job: yes, this is
>>>>>>>>>>>>> currently the expected behaviour but there are plans to change 
>>>>>>>>>>>>> this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think this is the offending piece. There is a catch all
>>>>>>>>>>>>> Exception, which IMHO should understand a recoverable exception 
>>>>>>>>>>>>> from an
>>>>>>>>>>>>> unrecoverable on.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> try {
>>>>>>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>>>>>>>> eckpointStateHandle);
>>>>>>>>>>>>> if (completedCheckpoint != null) {
>>>>>>>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>>>>>>>> }
>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>>>>>>>> completed " +
>>>>>>>>>>>>> "checkpoint store.", e);
>>>>>>>>>>>>> // remove the checkpoint with broken state handle
>>>>>>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>>>>>>>> checkpointStateHandle.f0);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had
>>>>>>>>>>>>>> some state ( backed by hdfs ) that referred to a checkpoint ( 
>>>>>>>>>>>>>> the same
>>>>>>>>>>>>>> exact last successful checkpoint that was successful before NN 
>>>>>>>>>>>>>> screwed us
>>>>>>>>>>>>>> ). When the JM tried to recreate the state and b'coz NN was down 
>>>>>>>>>>>>>> failed to
>>>>>>>>>>>>>> retrieve the CHK handle from hdfs and conveniently ( and I think 
>>>>>>>>>>>>>> very
>>>>>>>>>>>>>> wrongly ) removed the CHK from being considered and cleaned the 
>>>>>>>>>>>>>> pointer (
>>>>>>>>>>>>>> though failed as was NN was down and is obvious from the 
>>>>>>>>>>>>>> dangling file in
>>>>>>>>>>>>>> recovery ) . The metadata itself was on hdfs and failure in 
>>>>>>>>>>>>>> retrieving
>>>>>>>>>>>>>> should have been a stop all, not going to trying doing magic 
>>>>>>>>>>>>>> exception
>>>>>>>>>>>>>> rather than starting from a blank state.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve
>>>>>>>>>>>>>> checkpoint 44286 from state handle under
>>>>>>>>>>>>>> /0000000000000044286. This indicates that the retrieved state 
>>>>>>>>>>>>>> handle is
>>>>>>>>>>>>>> broken. Try cleaning the state handle store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the
>>>>>>>>>>>>>>> same hdfs cluster ) also showed the same behavior. It had the 
>>>>>>>>>>>>>>> pointers to
>>>>>>>>>>>>>>> the chk point  ( I  think that is what it does, keeps metadata 
>>>>>>>>>>>>>>> of where the
>>>>>>>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file 
>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>> failed state.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8d
>>>>>>>>>>>>>>>> fa864e2f9a51764de9f0725b39/chk-44286
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8d
>>>>>>>>>>>>>>>> fa864e2f9a51764de9f0725b39/chk-45428
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk
>>>>>>>>>>>>>>>> point directory with a new one. I see it happening now. Every 
>>>>>>>>>>>>>>>> minute it
>>>>>>>>>>>>>>>> replaces the old directory.  In this job's case however, it 
>>>>>>>>>>>>>>>> did not delete
>>>>>>>>>>>>>>>> the 2017-10-04 13:54  and hence the chk-44286 directory.  This 
>>>>>>>>>>>>>>>> was the last
>>>>>>>>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had 
>>>>>>>>>>>>>>>> issues but as
>>>>>>>>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it 
>>>>>>>>>>>>>>>> started with a
>>>>>>>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has 
>>>>>>>>>>>>>>>>> some natural
>>>>>>>>>>>>>>>>> pluses Flink's state management is far more advanced. With 
>>>>>>>>>>>>>>>>> kafka as a
>>>>>>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with 
>>>>>>>>>>>>>>>>> pub/sub is organic
>>>>>>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue 
>>>>>>>>>>>>>>>>> with windows on
>>>>>>>>>>>>>>>>> event time etc )
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                    Coming back to this issue. We have that
>>>>>>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we 
>>>>>>>>>>>>>>>>> do not see any
>>>>>>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not 
>>>>>>>>>>>>>>>>> applicable. I am
>>>>>>>>>>>>>>>>> totally certain that the "retention" time was not an
>>>>>>>>>>>>>>>>> issue. It is 4 days of retention and we fixed this issue 
>>>>>>>>>>>>>>>>> within 30 minutes.
>>>>>>>>>>>>>>>>> We could replay kafka with a new consumer group.id and
>>>>>>>>>>>>>>>>> that worked fine.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
>>>>>>>>>>>>>>>>> consumers is the default true. I bring this up to see whether 
>>>>>>>>>>>>>>>>> flink will in
>>>>>>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived 
>>>>>>>>>>>>>>>>> offset rather
>>>>>>>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been
>>>>>>>>>>>>>>>>> set.  The state is big enough though therefore IMHO no way 
>>>>>>>>>>>>>>>>> the state is
>>>>>>>>>>>>>>>>> stored along with the meta data in JM ( or ZK ? ) . The 
>>>>>>>>>>>>>>>>> reason I bring this
>>>>>>>>>>>>>>>>> up is to make sure when you say that the size has to be less 
>>>>>>>>>>>>>>>>> than 1024bytes
>>>>>>>>>>>>>>>>> , you are talking about cumulative state of the pipeine.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * We have a good sense of SP ( save point )  and CP (
>>>>>>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are 
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts 
>>>>>>>>>>>>>>>>> to restart
>>>>>>>>>>>>>>>>> the pipe before it finally succeeded.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= 
>>>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Do these make sense ? Is there anything else I should look
>>>>>>>>>>>>>>>>> at.  Please also note that it is the second time this has 
>>>>>>>>>>>>>>>>> happened. The
>>>>>>>>>>>>>>>>> first time I was vacationing and was not privy to the state 
>>>>>>>>>>>>>>>>> of the flink
>>>>>>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the 
>>>>>>>>>>>>>>>>> first window
>>>>>>>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Vishal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <
>>>>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> window operators are always stateful because the operator
>>>>>>>>>>>>>>>>>> needs to remember previously received events 
>>>>>>>>>>>>>>>>>> (WindowFunction) or
>>>>>>>>>>>>>>>>>> intermediate results (ReduceFunction).
>>>>>>>>>>>>>>>>>> Given the program you described, a checkpoint should
>>>>>>>>>>>>>>>>>> include the Kafka consumer offset and the state of the 
>>>>>>>>>>>>>>>>>> window operator. If
>>>>>>>>>>>>>>>>>> the program eventually successfully (i.e., without an error) 
>>>>>>>>>>>>>>>>>> recovered from
>>>>>>>>>>>>>>>>>> the last checkpoint, all its state should have been 
>>>>>>>>>>>>>>>>>> restored. Since the
>>>>>>>>>>>>>>>>>> last checkpoint was before HDFS went into safe mode, the 
>>>>>>>>>>>>>>>>>> program would have
>>>>>>>>>>>>>>>>>> been reset to that point. If the Kafka retention time is 
>>>>>>>>>>>>>>>>>> less than the time
>>>>>>>>>>>>>>>>>> it took to fix HDFS you would have lost data because it 
>>>>>>>>>>>>>>>>>> would have been
>>>>>>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to 
>>>>>>>>>>>>>>>>>> investigate this
>>>>>>>>>>>>>>>>>> further because a checkpoint recovery must not result in 
>>>>>>>>>>>>>>>>>> state loss.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a 
>>>>>>>>>>>>>>>>>> completed savepoint,
>>>>>>>>>>>>>>>>>> you can restart the job from that point. The main difference 
>>>>>>>>>>>>>>>>>> is that
>>>>>>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>>>>>>>>> discarded once
>>>>>>>>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding your question if a failed checkpoint should
>>>>>>>>>>>>>>>>>> cause the job to fail and recover I'm not sure what the 
>>>>>>>>>>>>>>>>>> current status is.
>>>>>>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint
>>>>>>>>>>>>>>>>>> fails.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored
>>>>>>>>>>>>>>>>>>>> and retried with potentially larger state. I had this 
>>>>>>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE
>>>>>>>>>>>>>>>>>>>> is not supported in state standby. Visit
>>>>>>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(S
>>>>>>>>>>>>>>>>>>>> afetyNetWrapperFileSystem.java:111)
>>>>>>>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and
>>>>>>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would not have worried about the restart, but it was
>>>>>>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my 
>>>>>>>>>>>>>>>>>>>> kafka consumer that
>>>>>>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next 
>>>>>>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had 
>>>>>>>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts 
>>>>>>>>>>>>>>>>>>>> on a keyed operator
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing
>>>>>>>>>>>>>>>>>>>> turned on.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>>>>>>>>> * Why on restart did the operator state did not
>>>>>>>>>>>>>>>>>>>> recreate ?
>>>>>>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with
>>>>>>>>>>>>>>>>>>>> any of this b'coz suspend and resume from a save point 
>>>>>>>>>>>>>>>>>>>> work as expected ?
>>>>>>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the
>>>>>>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have
>>>>>>>>>>>>>>>>>>>> timeWindow(Time.of(window_size,
>>>>>>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new 
>>>>>>>>>>>>>>>>>>>> WindowFunction()), the
>>>>>>>>>>>>>>>>>>>> state is managed by flink ?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to