[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394544#comment-16394544 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5656 merged > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394545#comment-16394545 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5656 > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392682#comment-16392682 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5656 Looks good to me now. +1 > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391619#comment-16391619 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5656 rebased > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391556#comment-16391556 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r173227736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- The reason is that the other methods in this class use `Time`, but I can switch the new ones to `Duration`. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391533#comment-16391533 ] Aljoscha Krettek commented on FLINK-8487: - ITCase added on release-1.4 in 72b3ddad1df86f383ea4f75e3dd597a07374df65 > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391534#comment-16391534 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5654 merged > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391512#comment-16391512 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5655 > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391429#comment-16391429 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5654 > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391428#comment-16391428 ] Aljoscha Krettek commented on FLINK-8487: - ITCase added on release-1.3 in b6cfe7baaf22cd0ab6d3f57ef247434e1ec44f6f > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391268#comment-16391268 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r173164690 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391261#comment-16391261 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5654 @StephanEwen pushed some changes > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391171#comment-16391171 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173143151 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391169#comment-16391169 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173142937 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391167#comment-16391167 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173142759 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391117#comment-16391117 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173134402 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391115#comment-16391115 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173134300 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390207#comment-16390207 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172979459 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- Deadline uses `Duration`, this method uses `Time`. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390208#comment-16390208 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980585 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { --- End diff -- I have similar comments for this test as for #5654 - the test is fins in general, but can probably be simplified slightly, plus a bit of code style cleanup. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390205#comment-16390205 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390206#comment-16390206 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978813 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; + + private Deadline(Duration time) { + this.time = time; + } + + public Deadline plus(Duration other) { + return new Deadline(time.plus(other)); + } + + /** +* Returns the time left between the deadline and now. +*/ + public Duration timeLeft() { + return time.minus(Duration.ofNanos(System.nanoTime())); --- End diff -- Is this expected to go negative, or simply stay at 0 when overdue? > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390204#comment-16390204 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978556 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; --- End diff -- I find this a bit confusing to use `Duration` here, because it really does not hold a duration, but an absolute point in time (in the future) evaluated against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, which also makes the `isOverdue()` check (the most frequent one) cheaper. You can (and should) still use `Duration` for the arithmetic (adding time, etc) - simply convert to nanos. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390175#comment-16390175 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172977111 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390174#comment-16390174 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5655 This code is (except for lambdas) identical to #5654 Please apply the same changes here as to the other PR (with regard to the review comments). > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390153#comment-16390153 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172963487 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390154#comment-16390154 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972943 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390152#comment-16390152 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972120 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390156#comment-16390156 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972454 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390155#comment-16390155 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972736 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390157#comment-16390157 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972678 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389742#comment-16389742 ] ASF GitHub Bot commented on FLINK-8487: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893483 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389740#comment-16389740 ] ASF GitHub Bot commented on FLINK-8487: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893235 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389702#comment-16389702 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172882773 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389701#comment-16389701 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172882495 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389651#comment-16389651 ] ASF GitHub Bot commented on FLINK-8487: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172857716 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389652#comment-16389652 ] ASF GitHub Bot commented on FLINK-8487: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5656 We shouldn't look at this one anyways because we can't merge it on master unless the other stuff is in. Because the test will not work. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389650#comment-16389650 ] ASF GitHub Bot commented on FLINK-8487: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172856770 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389606#comment-16389606 ] ASF GitHub Bot commented on FLINK-8487: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5656 should probably clean this branch from the restore/QS tests, and rebase it once. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389591#comment-16389591 ] ASF GitHub Bot commented on FLINK-8487: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5656 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase (master/1.5) R: @zentol @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5656 commit a0cbd2433bd90806ee1639d8fd7745cf6ecd5eab Author: Aljoscha KrettekDate: 2018-02-26T09:12:44Z [FLINK-8758] Add getters to JobDetailsInfo commit f21d3d95677cf4c9e10b44027b556e5063e8ac4c Author: Aljoscha Krettek Date: 2018-02-26T11:29:23Z Add proper toString() on JsonResponse in RestClient commit 903febcf255dc5e86e1f306e8d3c12f6f3f6ad3b Author: Aljoscha Krettek Date: 2018-02-26T10:44:57Z [FLINK-8757] Add MiniClusterResource.getClusterClient() commit 4e47f877b6d51b7f1d22bd3acf11cf8b9a5b3744 Author: Aljoscha Krettek Date: 2018-02-26T10:52:50Z [FLINK-8758] Make non-blocking ClusterClient.submitJob() public commit 6a1a0d58b2c9000f5cf7d489a01debd9b7e32e4d Author: Aljoscha Krettek Date: 2018-02-26T10:53:47Z [FLINK-8758] Add ClusterClient.getJobStatus() commit 80581cf3f4847d889655c3ff070ebf73c772a03c Author: Aljoscha Krettek Date: 2018-02-27T12:40:51Z [FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay() This retries getting a result until it matches a given predicate or until we run out of retries. commit c7d298e261942a8b145b7c47f2eedbc597b75ea4 Author: Aljoscha Krettek Date: 2018-02-28T14:06:59Z Add our own Deadline implementation commit b09cd9b92b7f3d3eda07964e139dc8dba7b01116 Author: Aljoscha Krettek Date: 2018-02-27T12:42:09Z [FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource commit 46ebb586074ac35f153a9b2e98880b29d51f0abb Author: Aljoscha Krettek Date: 2018-02-26T10:55:14Z [FLINK-8778] Port queryable state ITCases to use MiniClusterResource commit 7c382e45243ee96b8d64715ca5d10e3822add7d7 Author: Aljoscha Krettek Date: 2018-03-03T08:34:56Z [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase commit d7bf4070d03985b54c479d05c8c0f5534b820e61 Author: Aljoscha Krettek Date: 2018-03-06T15:29:48Z Incorporate Stephan suggestions > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389583#comment-16389583 ] ASF GitHub Bot commented on FLINK-8487: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5655 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase R: @zentol @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case-release-14 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5655 commit b93b8c407580fff60158b0e5390fb4036a5c90ba Author: Aljoscha KrettekDate: 2018-03-03T08:34:56Z [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389580#comment-16389580 ] ASF GitHub Bot commented on FLINK-8487: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5654 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case-release-13 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5654.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5654 commit 62c6695c61c628ef842c6a29dbb517d71e50ca59 Author: Aljoscha KrettekDate: 2018-03-03T08:34:56Z [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357294#comment-16357294 ] Tzu-Li (Gordon) Tai commented on FLINK-8487: [~aljoscha] can you confirm? > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335551#comment-16335551 ] Stefan Richter commented on FLINK-8487: --- Afaik [~aljoscha] already fixed this in FLINK-7783? > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)