[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5656 ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r173227736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- The reason is that the other methods in this class use `Time`, but I can switch the new ones to `Duration`. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r173164690 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980585 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { --- End diff -- I have similar comments for this test as for #5654 - the test is fins in general, but can probably be simplified slightly, plus a bit of code style cleanup. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172979459 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- Deadline uses `Duration`, this method uses `Time`. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978556 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; --- End diff -- I find this a bit confusing to use `Duration` here, because it really does not hold a duration, but an absolute point in time (in the future) evaluated against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, which also makes the `isOverdue()` check (the most frequent one) cheaper. You can (and should) still use `Duration` for the arithmetic (adding time, etc) - simply convert to nanos. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978813 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; + + private Deadline(Duration time) { + this.time = time; + } + + public Deadline plus(Duration other) { + return new Deadline(time.plus(other)); + } + + /** +* Returns the time left between the deadline and now. +*/ + public Duration timeLeft() { + return time.minus(Duration.ofNanos(System.nanoTime())); --- End diff -- Is this expected to go negative, or simply stay at 0 when overdue? ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893483 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172893235 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172882773 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172882495 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172857716 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172856770 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5656 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase (master/1.5) R: @zentol @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5656 commit a0cbd2433bd90806ee1639d8fd7745cf6ecd5eab Author: Aljoscha KrettekDate: 2018-02-26T09:12:44Z [FLINK-8758] Add getters to JobDetailsInfo commit f21d3d95677cf4c9e10b44027b556e5063e8ac4c Author: Aljoscha Krettek Date: 2018-02-26T11:29:23Z Add proper toString() on JsonResponse in RestClient commit 903febcf255dc5e86e1f306e8d3c12f6f3f6ad3b Author: Aljoscha Krettek Date: 2018-02-26T10:44:57Z [FLINK-8757] Add MiniClusterResource.getClusterClient() commit 4e47f877b6d51b7f1d22bd3acf11cf8b9a5b3744 Author: Aljoscha Krettek Date: 2018-02-26T10:52:50Z [FLINK-8758] Make non-blocking ClusterClient.submitJob() public commit 6a1a0d58b2c9000f5cf7d489a01debd9b7e32e4d Author: Aljoscha Krettek Date: 2018-02-26T10:53:47Z [FLINK-8758] Add ClusterClient.getJobStatus() commit 80581cf3f4847d889655c3ff070ebf73c772a03c Author: Aljoscha Krettek Date: 2018-02-27T12:40:51Z [FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay() This retries getting a result until it matches a given predicate or until we run out of retries. commit c7d298e261942a8b145b7c47f2eedbc597b75ea4 Author: Aljoscha Krettek Date: 2018-02-28T14:06:59Z Add our own Deadline implementation commit b09cd9b92b7f3d3eda07964e139dc8dba7b01116 Author: Aljoscha Krettek Date: 2018-02-27T12:42:09Z [FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource commit 46ebb586074ac35f153a9b2e98880b29d51f0abb Author: Aljoscha Krettek Date: 2018-02-26T10:55:14Z [FLINK-8778] Port queryable state ITCases to use MiniClusterResource commit 7c382e45243ee96b8d64715ca5d10e3822add7d7 Author: Aljoscha Krettek Date: 2018-03-03T08:34:56Z [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase commit d7bf4070d03985b54c479d05c8c0f5534b820e61 Author: Aljoscha Krettek Date: 2018-03-06T15:29:48Z Incorporate Stephan suggestions ---