[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297567190
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static 

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297557240
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES.
+ * It is used for normal scheduling and legacy failover strategy re-scheduling.
+ */
+public class SchedulingUtils {
+
+   /**
+* Schedule vertices lazy. That means only vertices satisfying its 
input constraint will be scheduled.
+*
+* @param vertices A list of topologically sorted vertices to schedule.
+* @param executionGraph The graph the given vertices belongs to.
+*/
+   public static CompletableFuture scheduleLazy(
+   final List vertices,
+   final ExecutionGraph executionGraph) {
+
+   executionGraph.assertRunningInJobMasterMainThread();
+
+   final Set previousAllocations = 
computePriorAllocationIdsIfRequiredByScheduling(
+   vertices, executionGraph.getSlotProvider());
+
+   final ArrayList> schedulingFutures = 
new ArrayList<>(vertices.size());
+   for (ExecutionVertex executionVertex : vertices) {
+   // only schedule vertex when its input constraint is 
satisfied
+   if 
(executionVertex.getJobVertex().getJobVertex().isInputVertex() ||
+   
executionVertex.checkInputDependencyConstraints()) {
+
+   final CompletableFuture 
schedulingVertexFuture = executionVertex.scheduleForExecution(
+   executionGraph.getSlotProvider(),
+   
executionGraph.isQueuedSchedulingAllowed(),
+   LocationPreferenceConstraint.ANY,
+   previousAllocations);
+
+   schedulingFutures.add(schedulingVertexFuture);
+   }
+   }
+
+   return FutureUtils.waitForAll(schedulingFutures);
+   }
+
+   /**
+* Schedule vertices lazy. That means all vertices will be scheduled at 
once.
+*
+* @param vertices A list of topologically sorted vertices to schedule.
+* @param executionGraph The graph the given vertices belongs to.
+*/
+   public static CompletableFuture scheduleEager(
+   final List vertices,
+   final ExecutionGraph executionGraph) {
+
+   executionGraph.assertRunningInJobMasterMainThread();
+
+   checkState(executionGraph.getState() == JobStatus.RUNNING, "job 
is not running currently");
+
+   // Important: reserve all the space we need up front.
+   // that way we do not have any operation that can fail between 
allocating the slots
+   // and adding them

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297538465
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingC

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297508269
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>())

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297510269
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingC

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297516088
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingC

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297508129
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>())

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297509973
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingC

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465153
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java
 ##
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for testing region failover strategy.
+ */
+public class RegionFailoverITCase extends TestLogger {
 
 Review comment:
   I take it as E2E tests that ensures the job can succeed even when task 
failures happens when using region failover strategy.
   IT cases do blackbox verifications so it does not verify internal 
states(like which tasks are re-scheduled).
   It just gives us confidence that Flink runtime is working well in this 
scenario.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297491370
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex 
ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * Examples for modifications include:
+ * 
+ * cancellation of the underlying execution
+ * deployment of the execution vertex
+ * 
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
 
 Review comment:
   ExecutionVertexVersioner is from @GJL . He has also implemented tests for 
them 
https://github.com/GJL/flink/blob/FLINK-12433/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java.
   
   I've ported these tests and made some minor changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297475345
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297476171
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297475345
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466824
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
 
 Review comment:
   The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor 
param.
   So we cannot build it before the EG is generated.

--

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297470182
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>())

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465424
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
 
 Review comment:
   Making `testMainThreadUtil` static will cause it to be `null` in tests.
   The Resource is static because it is a ClassRule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466824
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
 
 Review comment:
   The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor 
param.


Th

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466312
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex 
ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * Examples for modifications include:
+ * 
+ * cancellation of the underlying execution
+ * deployment of the execution vertex
+ * 
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
 
 Review comment:
   Yes. Will add tests for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466134
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>())

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465694
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>())

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465424
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465153
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java
 ##
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for testing region failover strategy.
+ */
+public class RegionFailoverITCase extends TestLogger {
 
 Review comment:
   I take it as E2E tests that ensures the job can succeed even when task 
failures happens when using region failover strategy.
   IT cases do blackbox verifications so it does not verify internal 
states(like which tasks are re-scheduled).
   It just gives us confidence that Flink runtime is working well in this 
scenario.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297463953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,7 +63,7 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
 
 Review comment:
   Ok. Let's make the adapted region failover strategy default in another PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297169235
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingC

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297166582
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGTest.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import 
org.apache.flink.runtime.executiongraph.failover.PipelinedFailoverRegionBuildingTest;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategyBuildingTest;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGTest extends TestLogger {
+
+   /**
+* Test that {@link AdaptedRestartPipelinedRegionStrategyNG} is loaded
+* as failover strategy if {@link 
JobManagerOptions#EXECUTION_FAILOVER_STRATEGY}
+* value is "region".
+*/
+   @Test
+   public void testAdaptedRestartPipelinedRegionStrategyNGLoading() throws 
Exception {
+   final ExecutionGraph eg = createExecutionGraph(new JobGraph());
+
+   assertTrue(eg.getFailoverStrategy() instanceof 
AdaptedRestartPipelinedRegionStrategyNG);
 
 Review comment:
   Fine. Let's omit this test as the loading behavior is actually covered by 
some other AdaptedRestartPipelinedRegionStrategyNG tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297165275
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGTest.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import 
org.apache.flink.runtime.executiongraph.failover.PipelinedFailoverRegionBuildingTest;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategyBuildingTest;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGTest extends TestLogger {
+
+   /**
+* Test that {@link AdaptedRestartPipelinedRegionStrategyNG} is loaded
+* as failover strategy if {@link 
JobManagerOptions#EXECUTION_FAILOVER_STRATEGY}
+* value is "region".
+*/
+   @Test
+   public void testAdaptedRestartPipelinedRegionStrategyNGLoading() throws 
Exception {
+   final ExecutionGraph eg = createExecutionGraph(new JobGraph());
+
+   assertTrue(eg.getFailoverStrategy() instanceof 
AdaptedRestartPipelinedRegionStrategyNG);
+   }
+
+   /**
+* Test whether region building works. This helps to make sure that the
+* overall region building process is not breaking.
+* More detailed region building verification is covered
+* in {@link RestartPipelinedRegionStrategyBuildingTest}
+* 
+* (v11) --> (v21) -+-> (v31) --> (v41)
+*  x
+* (v12) --> (v22) -+-> (v32) --> (v42)
+*
+*  ^
+*  |
+*  (blocking)
+* 
+* 4 regions. Each region has 2 pipelined connected vertices.
+*/
+   @Test
+   public void testRegionBuilding() throws Exception {
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297082648
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,7 +63,7 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
 
 Review comment:
   Ok. Let's take this way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297080370
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
 ##
 @@ -34,22 +34,4 @@
 * @return set of IDs of vertices to restart
 */
Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause);
-
-   // 

-   //  factory
-   // 

-
-   /**
-* The factory to instantiate {@link FailoverStrategy}.
-*/
-   interface Factory {
 
 Review comment:
   Ok. Let's exclude this change for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297079565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297071613
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -576,7 +576,7 @@ else if (numSources < parallelism) {
 */
public Execution resetForNewExecution(final long timestamp, final long 
originatingGlobalModVersion)
throws GlobalModVersionMismatch {
-   LOG.debug("Resetting execution vertex {} for new execution.", 
getTaskNameWithSubtaskIndex());
+   LOG.info("Resetting execution vertex {} to CREATED state for 
new execution.", getTaskNameWithSubtaskIndex());
 
 Review comment:
   Ok. Let's revert this change and do it later in a separate fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296994653
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
 ##
 @@ -34,22 +34,4 @@
 * @return set of IDs of vertices to restart
 */
Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause);
-
-   // 

-   //  factory
-   // 

-
-   /**
-* The factory to instantiate {@link FailoverStrategy}.
-*/
-   interface Factory {
 
 Review comment:
   So far I think there is no need to create a JM shared FailoverStrategy 
factory. Each JM/Scheduler can create their own failoverStrategy directly.
   We should implement a loader util for creating failoverStrategy from 
configuration though.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296999324
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class contains scheduling logic similar to that in ExecutionGraph.
+ * It is used by legacy failover strategy as the strategy will do the 
re-scheduling work by itself.
+ * We extract it from ExecutionGraph to avoid affect existing scheduling logic.
+ */
+public class AdaptedSchedulingUtils {
+
+   static CompletableFuture scheduleLazy(
+   final Set vertices,
+   final ExecutionGraph executionGraph) {
+
+   final Set previousAllocations = 
getAllPriorAllocationIds(vertices);
+
+   final ArrayList> schedulingFutures = 
new ArrayList<>(vertices.size());
+   for (ExecutionVertex executionVertex : vertices) {
 
 Review comment:
   Above discusses for LAZY scheduling. For EAGER mode, unordered scheduling 
increase the chance that downstream tasks get launched earlier. In this case 
downstream tasks will fail to request partition and will trigger lots of 
partition state check RPCs to JM, which may result in JM main thread get jammed.
   
   So I think it can be beneficial to order the vertices before scheduling, 
both for eager or lazy mode. Will do it in the next commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296998163
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class contains scheduling logic similar to that in ExecutionGraph.
+ * It is used by legacy failover strategy as the strategy will do the 
re-scheduling work by itself.
+ * We extract it from ExecutionGraph to avoid affect existing scheduling logic.
+ */
+public class AdaptedSchedulingUtils {
+
+   static CompletableFuture scheduleLazy(
+   final Set vertices,
+   final ExecutionGraph executionGraph) {
+
+   final Set previousAllocations = 
getAllPriorAllocationIds(vertices);
+
+   final ArrayList> schedulingFutures = 
new ArrayList<>(vertices.size());
+   for (ExecutionVertex executionVertex : vertices) {
+   // only schedule vertex when its input constraint is 
satisfied
+   if 
(executionVertex.getJobVertex().getJobVertex().isInputVertex() ||
+   
executionVertex.checkInputDependencyConstraints()) {
+
+   final CompletableFuture 
schedulingVertexFuture = executionVertex.scheduleForExecution(
+   executionGraph.getSlotProvider(),
+   
executionGraph.isQueuedSchedulingAllowed(),
+   LocationPreferenceConstraint.ANY,
+   previousAllocations);
+
+   schedulingFutures.add(schedulingVertexFuture);
+   }
+   }
+
+   return FutureUtils.waitForAll(schedulingFutures);
+   }
+
+   static CompletableFuture scheduleEager(
 
 Review comment:
   I think its a balance between simplicity, risk and maintainability.
   I'd be fine to have EG to call these methods to do scheduling.
   Will do this in the next commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296998163
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class contains scheduling logic similar to that in ExecutionGraph.
+ * It is used by legacy failover strategy as the strategy will do the 
re-scheduling work by itself.
+ * We extract it from ExecutionGraph to avoid affect existing scheduling logic.
+ */
+public class AdaptedSchedulingUtils {
+
+   static CompletableFuture scheduleLazy(
+   final Set vertices,
+   final ExecutionGraph executionGraph) {
+
+   final Set previousAllocations = 
getAllPriorAllocationIds(vertices);
+
+   final ArrayList> schedulingFutures = 
new ArrayList<>(vertices.size());
+   for (ExecutionVertex executionVertex : vertices) {
+   // only schedule vertex when its input constraint is 
satisfied
+   if 
(executionVertex.getJobVertex().getJobVertex().isInputVertex() ||
+   
executionVertex.checkInputDependencyConstraints()) {
+
+   final CompletableFuture 
schedulingVertexFuture = executionVertex.scheduleForExecution(
+   executionGraph.getSlotProvider(),
+   
executionGraph.isQueuedSchedulingAllowed(),
+   LocationPreferenceConstraint.ANY,
+   previousAllocations);
+
+   schedulingFutures.add(schedulingVertexFuture);
+   }
+   }
+
+   return FutureUtils.waitForAll(schedulingFutures);
+   }
+
+   static CompletableFuture scheduleEager(
 
 Review comment:
   I think its a balance between simplicity, risk and maintainability.
   I'd be fine to have EG to call these methods to do scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296994653
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
 ##
 @@ -34,22 +34,4 @@
 * @return set of IDs of vertices to restart
 */
Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause);
-
-   // 

-   //  factory
-   // 

-
-   /**
-* The factory to instantiate {@link FailoverStrategy}.
-*/
-   interface Factory {
 
 Review comment:
   So far I think there is no need to create a JM shared FailoverStrategy 
factory. Each JM/Scheduler can create their own failoverStrategy directly.
   We should implement a loader util for creating failoverStrategy from 
ExecutionGraph though.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296993735
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296992297
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
 
 Review comment:
   Sorry for missing checking it. Will fix it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296989612
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class contains scheduling logic similar to that in ExecutionGraph.
+ * It is used by legacy failover strategy as the strategy will do the 
re-scheduling work by itself.
+ * We extract it from ExecutionGraph to avoid affect existing scheduling logic.
+ */
+public class AdaptedSchedulingUtils {
+
+   static CompletableFuture scheduleLazy(
+   final Set vertices,
+   final ExecutionGraph executionGraph) {
+
+   final Set previousAllocations = 
getAllPriorAllocationIds(vertices);
+
+   final ArrayList> schedulingFutures = 
new ArrayList<>(vertices.size());
+   for (ExecutionVertex executionVertex : vertices) {
 
 Review comment:
   Theoretically, it should not cause any issue as these vertices are scheduled 
individually. And only vertex satisfying the input constraint will be 
scheduled. 
   
   In implementation, however, there is a bit higher chance that downstream 
task get launched earlier than its source tasks, and causes a resource 
deadlock. But even doing it topologically does not prevent this to happen. 
   To avoid resource deadlock, users need to set `InputDependencyConstraint` to 
be ALL. In this case, any schedule order should be OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296987179
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,7 +63,7 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
 
 Review comment:
   Shall we make AdaptedRestartPipelinedRegionStrategyNG as default "region" 
failover strategy and add a "region_legacy" for RestartPipelinedRegionStrategy?
   As the legacy RestartPipelinedRegionStrategy is not applicable for batch 
jobs.
   
   Or adding  a "region_new" for AdaptedRestartPipelinedRegionStrategyNG and 
let users to select it when running batch jobs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296981716
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -576,7 +576,7 @@ else if (numSources < parallelism) {
 */
public Execution resetForNewExecution(final long timestamp, final long 
originatingGlobalModVersion)
throws GlobalModVersionMismatch {
-   LOG.debug("Resetting execution vertex {} for new execution.", 
getTaskNameWithSubtaskIndex());
+   LOG.info("Resetting execution vertex {} to CREATED state for 
new execution.", getTaskNameWithSubtaskIndex());
 
 Review comment:
   Currently we have log tracking vertex state transitions in most 
cases(through Execution::transitionState), but lack log of transition to 
CREATED state(it is directly set). I think this could be helpful to track 
vertex lifecycle. 
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296677818
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296678129
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296678241
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296677116
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296671876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296671652
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296671692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296671431
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296670872
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296670958
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r29990
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296665390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296665781
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296665390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296665390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+  

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-24 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r296664665
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+