ping.

    This happened again on production and it seems reasonable to abort when
a checkpoint is not found rather than behave as if it is a brand new
pipeline.

On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Folks sorry for being late on this. Can some body with the knowledge of
> this code base create a jira issue for the above ? We have seen this more
> than once on production.
>
> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Vishal,
>>
>> Some relevant Jira issues for you are:
>>
>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>> failed checkpoints
>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
>> to earlier checkpoint when checkpoint restore fails
>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove
>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>
>> Best,
>> Aljoscha
>>
>>
>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Hi Vishal,
>>
>> it would be great if you could create a JIRA ticket with Blocker priority.
>> Please add all relevant information of your detailed analysis, add a link
>> to this email thread (see [1] for the web archive of the mailing list), and
>> post the id of the JIRA issue here.
>>
>> Thanks for looking into this!
>>
>> Best regards,
>> Fabian
>>
>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>
>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> Thank you for confirming.
>>>
>>>
>>>  I think this is a critical bug. In essence any checkpoint store (
>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>> becomes all the more painful with your confirming that  "failed
>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>> store in unavailable  during checkpoint than you have lost state ( till of
>>> course you have a retry of none or an unbounded retry delay, a delay that
>>> you *hope* the store revives in ) .. Remember  the first retry failure
>>>  will cause new state according the code as written iff the remote store is
>>> down. We would rather have a configurable property that establishes  our
>>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>
>>>
>>> In our case it is very important that we do not undercount a window, one
>>> reason we use flink and it's awesome failure guarantees, as various alarms
>>> sound ( we do anomaly detection on the time series ).
>>>
>>> Please create a jira ticket for us to follow or we could do it.
>>>
>>>
>>> PS Not aborting on checkpointing, till a configurable limit is very
>>> important too.
>>>
>>>
>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> I think you're right! And thanks for looking into this so deeply.
>>>>
>>>> With your last mail your basically saying, that the checkpoint could
>>>> not be restored because your HDFS was temporarily down. If Flink had not
>>>> deleted that checkpoint it might have been possible to restore it at a
>>>> later point, right?
>>>>
>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>> the expected behaviour but there are plans to change this.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com>
>>>> wrote:
>>>>
>>>> I think this is the offending piece. There is a catch all Exception,
>>>> which IMHO should understand a recoverable exception from an unrecoverable
>>>> on.
>>>>
>>>>
>>>> try {
>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>> eckpointStateHandle);
>>>> if (completedCheckpoint != null) {
>>>> completedCheckpoints.add(completedCheckpoint);
>>>> }
>>>> } catch (Exception e) {
>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>> completed " +
>>>> "checkpoint store.", e);
>>>> // remove the checkpoint with broken state handle
>>>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle
>>>> .f0);
>>>> }
>>>>
>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> So this is the issue and tell us that it is wrong. ZK had some state (
>>>>> backed by hdfs ) that referred to a checkpoint ( the same exact last
>>>>> successful checkpoint that was successful before NN screwed us ). When the
>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve 
>>>>> the
>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
>>>>> the CHK from being considered and cleaned the pointer ( though failed as
>>>>> was NN was down and is obvious from the dangling file in recovery ) . The
>>>>> metadata itself was on hdfs and failure in retrieving should have been a
>>>>> stop all, not going to trying doing magic exception rather than starting
>>>>> from a blank state.
>>>>>
>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>> 44286 from state handle under /0000000000000044286. This indicates that 
>>>>> the
>>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs
>>>>>> cluster ) also showed the same behavior. It had the pointers to the chk
>>>>>> point  ( I  think that is what it does, keeps metadata of where the
>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>>>>>> failed state.
>>>>>>
>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>
>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>
>>>>>> This is getting a little interesting. What say you :)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> Another thing I noted was this thing
>>>>>>>
>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>>
>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>>
>>>>>>>
>>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>>> replaces
>>>>>>> the old directory.  In this job's case however, it did not delete the
>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the last
>>>>>>> chk-44286 (  I think  )  successfully created before NN had issues but 
>>>>>>> as
>>>>>>> is usual did not delete this  chk-44286. It looks as if it started with 
>>>>>>> a
>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello Fabian,
>>>>>>>>                       First of all congratulations on this fabulous
>>>>>>>> framework. I have worked with GDF and though GDF has some natural 
>>>>>>>> pluses
>>>>>>>> Flink's state management is far more advanced. With kafka as a source 
>>>>>>>> it
>>>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and 
>>>>>>>> that
>>>>>>>> is to be expected but non FIFO pub/sub is an issue with windows on 
>>>>>>>> event
>>>>>>>> time etc )
>>>>>>>>
>>>>>>>>                    Coming back to this issue. We have that same
>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see any
>>>>>>>> issue there, so so data loss on the source, kafka is not applicable. I 
>>>>>>>> am
>>>>>>>> totally certain that the "retention" time was not an issue. It is
>>>>>>>> 4 days of retention and we fixed this issue within 30 minutes. We could
>>>>>>>> replay kafka with a new consumer group.id and that worked fine.
>>>>>>>>
>>>>>>>>
>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>
>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers
>>>>>>>> is the default true. I bring this up to see whether flink will in any
>>>>>>>> circumstance drive consumption on the kafka perceived offset rather 
>>>>>>>> than
>>>>>>>> the one in the checkpoint.
>>>>>>>>
>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The
>>>>>>>> state is big enough though therefore IMHO no way the state is stored 
>>>>>>>> along
>>>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to
>>>>>>>> make sure when you say that the size has to be less than 1024bytes , 
>>>>>>>> you
>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>
>>>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint )
>>>>>>>> and certainly understand that they actually are not dissimilar. 
>>>>>>>> However in
>>>>>>>> this case there were multiple attempts to restart the pipe before it
>>>>>>>> finally succeeded.
>>>>>>>>
>>>>>>>> * Other hdfs related poperties.
>>>>>>>>
>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>> flink_hdfs_root %>
>>>>>>>>
>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>
>>>>>>>>
>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>> flink_hdfs_root %>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>>>> Please also note that it is the second time this has happened. The 
>>>>>>>> first
>>>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>>>> pipeline, but the net effect were similar. The counts for the first 
>>>>>>>> window
>>>>>>>> after an internal restart dropped.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you for you patience and regards,
>>>>>>>>
>>>>>>>> Vishal
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> window operators are always stateful because the operator needs to
>>>>>>>>> remember previously received events (WindowFunction) or intermediate
>>>>>>>>> results (ReduceFunction).
>>>>>>>>> Given the program you described, a checkpoint should include the
>>>>>>>>> Kafka consumer offset and the state of the window operator. If the 
>>>>>>>>> program
>>>>>>>>> eventually successfully (i.e., without an error) recovered from the 
>>>>>>>>> last
>>>>>>>>> checkpoint, all its state should have been restored. Since the last
>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would 
>>>>>>>>> have been
>>>>>>>>> reset to that point. If the Kafka retention time is less than the 
>>>>>>>>> time it
>>>>>>>>> took to fix HDFS you would have lost data because it would have been
>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate 
>>>>>>>>> this
>>>>>>>>> further because a checkpoint recovery must not result in state loss.
>>>>>>>>>
>>>>>>>>> Restoring from a savepoint is not so much different from automatic
>>>>>>>>> checkpoint recovery. Given that you have a completed savepoint, you 
>>>>>>>>> can
>>>>>>>>> restart the job from that point. The main difference is that 
>>>>>>>>> checkpoints
>>>>>>>>> are only used for internal recovery and usually discarded once the 
>>>>>>>>> job is
>>>>>>>>> terminated while savepoints are retained.
>>>>>>>>>
>>>>>>>>> Regarding your question if a failed checkpoint should cause the
>>>>>>>>> job to fail and recover I'm not sure what the current status is.
>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>
>>>>>>>>>> keyBy(0)
>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello folks,
>>>>>>>>>>>
>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>
>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>
>>>>>>>>>>>     
>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>>>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>     ..................
>>>>>>>>>>>
>>>>>>>>>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(
>>>>>>>>>>> HadoopFileSystem.java:453)
>>>>>>>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.
>>>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>
>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>>
>>>>>>>>>>> I would not have worried about the restart, but it was evident
>>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that 
>>>>>>>>>>> kept on
>>>>>>>>>>> advancing it's offset between a start and the next checkpoint 
>>>>>>>>>>> failure ( a
>>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates 
>>>>>>>>>>> was lost.
>>>>>>>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>>>>>>>
>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>>>>>>
>>>>>>>>>>> The questions thus are
>>>>>>>>>>>
>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>> * Is the nature of the exception thrown have to do with any of
>>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ?
>>>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to