[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394544#comment-16394544
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5656
  
merged


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394545#comment-16394545
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5656


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392682#comment-16392682
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5656
  
Looks good to me now.

+1


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391619#comment-16391619
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5656
  
rebased



> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391556#comment-16391556
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r173227736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

The reason is that the other methods in this class use `Time`, but I can 
switch the new ones to `Duration`.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391533#comment-16391533
 ] 

Aljoscha Krettek commented on FLINK-8487:
-

ITCase added on release-1.4 in
72b3ddad1df86f383ea4f75e3dd597a07374df65

> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391534#comment-16391534
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5654
  
merged


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391512#comment-16391512
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5655


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391429#comment-16391429
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5654


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391428#comment-16391428
 ] 

Aljoscha Krettek commented on FLINK-8487:
-

ITCase added on release-1.3 in
b6cfe7baaf22cd0ab6d3f57ef247434e1ec44f6f

> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391268#comment-16391268
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r173164690
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391261#comment-16391261
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5654
  
@StephanEwen pushed some changes


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391171#comment-16391171
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r173143151
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391169#comment-16391169
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r173142937
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391167#comment-16391167
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r173142759
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391117#comment-16391117
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r173134402
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391115#comment-16391115
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r173134300
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390207#comment-16390207
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172979459
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

Deadline uses `Duration`, this method uses `Time`.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390208#comment-16390208
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980585
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
--- End diff --

I have similar comments for this test as for #5654 - the test is fins in 
general, but can probably be simplified slightly, plus a bit of code style 
cleanup.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390205#comment-16390205
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980021
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390206#comment-16390206
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978813
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
+
+   private Deadline(Duration time) {
+   this.time = time;
+   }
+
+   public Deadline plus(Duration other) {
+   return new Deadline(time.plus(other));
+   }
+
+   /**
+* Returns the time left between the deadline and now.
+*/
+   public Duration timeLeft() {
+   return time.minus(Duration.ofNanos(System.nanoTime()));
--- End diff --

Is this expected to go negative, or simply stay at 0 when overdue?


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390204#comment-16390204
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978556
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
--- End diff --

I find this a bit confusing to use `Duration` here, because it really does 
not hold a duration, but an absolute point in time (in the future) evaluated 
against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, 
which also makes the `isOverdue()` check (the most frequent one) cheaper.

You can (and should) still use `Duration` for the arithmetic (adding time, 
etc) - simply convert to nanos.



> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390175#comment-16390175
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172977111
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390174#comment-16390174
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5655
  
This code is (except for lambdas) identical to #5654
Please apply the same changes here as to the other PR (with regard to the 
review comments).


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390153#comment-16390153
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172963487
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390154#comment-16390154
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972943
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390152#comment-16390152
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972120
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390156#comment-16390156
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972454
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390155#comment-16390155
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972736
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390157#comment-16390157
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972678
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389742#comment-16389742
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893483
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389740#comment-16389740
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893235
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389702#comment-16389702
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172882773
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389701#comment-16389701
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172882495
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389651#comment-16389651
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172857716
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389652#comment-16389652
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5656
  
We shouldn't look at this one anyways because we can't merge it on master 
unless the other stuff is in. Because the test will not work.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389650#comment-16389650
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172856770
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389606#comment-16389606
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5656
  
should probably clean this branch from the restore/QS tests, and rebase it 
once.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389591#comment-16389591
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5656

 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase 
(master/1.5)

R: @zentol @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5656


commit a0cbd2433bd90806ee1639d8fd7745cf6ecd5eab
Author: Aljoscha Krettek 
Date:   2018-02-26T09:12:44Z

[FLINK-8758] Add getters to JobDetailsInfo

commit f21d3d95677cf4c9e10b44027b556e5063e8ac4c
Author: Aljoscha Krettek 
Date:   2018-02-26T11:29:23Z

Add proper toString() on JsonResponse in RestClient

commit 903febcf255dc5e86e1f306e8d3c12f6f3f6ad3b
Author: Aljoscha Krettek 
Date:   2018-02-26T10:44:57Z

[FLINK-8757] Add MiniClusterResource.getClusterClient()

commit 4e47f877b6d51b7f1d22bd3acf11cf8b9a5b3744
Author: Aljoscha Krettek 
Date:   2018-02-26T10:52:50Z

[FLINK-8758] Make non-blocking ClusterClient.submitJob() public

commit 6a1a0d58b2c9000f5cf7d489a01debd9b7e32e4d
Author: Aljoscha Krettek 
Date:   2018-02-26T10:53:47Z

[FLINK-8758] Add ClusterClient.getJobStatus()

commit 80581cf3f4847d889655c3ff070ebf73c772a03c
Author: Aljoscha Krettek 
Date:   2018-02-27T12:40:51Z

[FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay()

This retries getting a result until it matches a given predicate or
until we run out of retries.

commit c7d298e261942a8b145b7c47f2eedbc597b75ea4
Author: Aljoscha Krettek 
Date:   2018-02-28T14:06:59Z

Add our own Deadline implementation

commit b09cd9b92b7f3d3eda07964e139dc8dba7b01116
Author: Aljoscha Krettek 
Date:   2018-02-27T12:42:09Z

[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource

commit 46ebb586074ac35f153a9b2e98880b29d51f0abb
Author: Aljoscha Krettek 
Date:   2018-02-26T10:55:14Z

[FLINK-8778] Port queryable state ITCases to use MiniClusterResource

commit 7c382e45243ee96b8d64715ca5d10e3822add7d7
Author: Aljoscha Krettek 
Date:   2018-03-03T08:34:56Z

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

commit d7bf4070d03985b54c479d05c8c0f5534b820e61
Author: Aljoscha Krettek 
Date:   2018-03-06T15:29:48Z

Incorporate Stephan suggestions




> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389583#comment-16389583
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5655

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

R: @zentol @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8487-zookeeper-it-case-release-14

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5655


commit b93b8c407580fff60158b0e5390fb4036a5c90ba
Author: Aljoscha Krettek 
Date:   2018-03-03T08:34:56Z

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase




> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389580#comment-16389580
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5654

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8487-zookeeper-it-case-release-13

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5654.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5654


commit 62c6695c61c628ef842c6a29dbb517d71e50ca59
Author: Aljoscha Krettek 
Date:   2018-03-03T08:34:56Z

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase




> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357294#comment-16357294
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8487:


[~aljoscha] can you confirm?

> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-01-23 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335551#comment-16335551
 ] 

Stefan Richter commented on FLINK-8487:
---

Afaik [~aljoscha] already fixed this in FLINK-7783?

> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)