[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-11 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5656


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r173227736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

The reason is that the other methods in this class use `Time`, but I can 
switch the new ones to `Duration`.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r173164690
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980585
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
--- End diff --

I have similar comments for this test as for #5654 - the test is fins in 
general, but can probably be simplified slightly, plus a bit of code style 
cleanup.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172979459
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

Deadline uses `Duration`, this method uses `Time`.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980021
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+ 

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978556
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
--- End diff --

I find this a bit confusing to use `Duration` here, because it really does 
not hold a duration, but an absolute point in time (in the future) evaluated 
against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, 
which also makes the `isOverdue()` check (the most frequent one) cheaper.

You can (and should) still use `Duration` for the arithmetic (adding time, 
etc) - simply convert to nanos.



---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978813
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
+
+   private Deadline(Duration time) {
+   this.time = time;
+   }
+
+   public Deadline plus(Duration other) {
+   return new Deadline(time.plus(other));
+   }
+
+   /**
+* Returns the time left between the deadline and now.
+*/
+   public Duration timeLeft() {
+   return time.minus(Duration.ofNanos(System.nanoTime()));
--- End diff --

Is this expected to go negative, or simply stay at 0 when overdue?


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893483
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+  

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172893235
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+  

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172882773
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172882495
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172857716
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+  

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172856770
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+  

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5656

 [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase 
(master/1.5)

R: @zentol @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5656


commit a0cbd2433bd90806ee1639d8fd7745cf6ecd5eab
Author: Aljoscha Krettek 
Date:   2018-02-26T09:12:44Z

[FLINK-8758] Add getters to JobDetailsInfo

commit f21d3d95677cf4c9e10b44027b556e5063e8ac4c
Author: Aljoscha Krettek 
Date:   2018-02-26T11:29:23Z

Add proper toString() on JsonResponse in RestClient

commit 903febcf255dc5e86e1f306e8d3c12f6f3f6ad3b
Author: Aljoscha Krettek 
Date:   2018-02-26T10:44:57Z

[FLINK-8757] Add MiniClusterResource.getClusterClient()

commit 4e47f877b6d51b7f1d22bd3acf11cf8b9a5b3744
Author: Aljoscha Krettek 
Date:   2018-02-26T10:52:50Z

[FLINK-8758] Make non-blocking ClusterClient.submitJob() public

commit 6a1a0d58b2c9000f5cf7d489a01debd9b7e32e4d
Author: Aljoscha Krettek 
Date:   2018-02-26T10:53:47Z

[FLINK-8758] Add ClusterClient.getJobStatus()

commit 80581cf3f4847d889655c3ff070ebf73c772a03c
Author: Aljoscha Krettek 
Date:   2018-02-27T12:40:51Z

[FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay()

This retries getting a result until it matches a given predicate or
until we run out of retries.

commit c7d298e261942a8b145b7c47f2eedbc597b75ea4
Author: Aljoscha Krettek 
Date:   2018-02-28T14:06:59Z

Add our own Deadline implementation

commit b09cd9b92b7f3d3eda07964e139dc8dba7b01116
Author: Aljoscha Krettek 
Date:   2018-02-27T12:42:09Z

[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource

commit 46ebb586074ac35f153a9b2e98880b29d51f0abb
Author: Aljoscha Krettek 
Date:   2018-02-26T10:55:14Z

[FLINK-8778] Port queryable state ITCases to use MiniClusterResource

commit 7c382e45243ee96b8d64715ca5d10e3822add7d7
Author: Aljoscha Krettek 
Date:   2018-03-03T08:34:56Z

[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

commit d7bf4070d03985b54c479d05c8c0f5534b820e61
Author: Aljoscha Krettek 
Date:   2018-03-06T15:29:48Z

Incorporate Stephan suggestions




---