[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297567190 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297557240 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES. + * It is used for normal scheduling and legacy failover strategy re-scheduling. + */ +public class SchedulingUtils { + + /** +* Schedule vertices lazy. That means only vertices satisfying its input constraint will be scheduled. +* +* @param vertices A list of topologically sorted vertices to schedule. +* @param executionGraph The graph the given vertices belongs to. +*/ + public static CompletableFuture scheduleLazy( + final List vertices, + final ExecutionGraph executionGraph) { + + executionGraph.assertRunningInJobMasterMainThread(); + + final Set previousAllocations = computePriorAllocationIdsIfRequiredByScheduling( + vertices, executionGraph.getSlotProvider()); + + final ArrayList> schedulingFutures = new ArrayList<>(vertices.size()); + for (ExecutionVertex executionVertex : vertices) { + // only schedule vertex when its input constraint is satisfied + if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || + executionVertex.checkInputDependencyConstraints()) { + + final CompletableFuture schedulingVertexFuture = executionVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, + previousAllocations); + + schedulingFutures.add(schedulingVertexFuture); + } + } + + return FutureUtils.waitForAll(schedulingFutures); + } + + /** +* Schedule vertices lazy. That means all vertices will be scheduled at once. +* +* @param vertices A list of topologically sorted vertices to schedule. +* @param executionGraph The graph the given vertices belongs to. +*/ + public static CompletableFuture scheduleEager( + final List vertices, + final ExecutionGraph executionGraph) { + + executionGraph.assertRunningInJobMasterMainThread(); + + checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297538465 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingC
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297508269 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>())
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297510269 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingC
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297516088 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingC
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297508129 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>())
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297509973 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingC
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297465153 ## File path: flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * IT case for testing region failover strategy. + */ +public class RegionFailoverITCase extends TestLogger { Review comment: I take it as E2E tests that ensures the job can succeed even when task failures happens when using region failover strategy. IT cases do blackbox verifications so it does not verify internal states(like which tasks are re-scheduled). It just gives us confidence that Flink runtime is working well in this scenario. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297491370 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Records modifications of + * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex ExecutionVertices}, and allows + * for checking whether a vertex was modified. + * + * Examples for modifications include: + * + * cancellation of the underlying execution + * deployment of the execution vertex + * + * + * @see DefaultScheduler + */ +public class ExecutionVertexVersioner { Review comment: ExecutionVertexVersioner is from @GJL . He has also implemented tests for them https://github.com/GJL/flink/blob/FLINK-12433/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java. I've ported these tests and made some minor changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297475345 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297476171 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297475345 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297466824 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = Review comment: The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor param. So we cannot build it before the EG is generated. --
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297470182 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>())
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297465424 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = Review comment: Making `testMainThreadUtil` static will cause it to be `null` in tests. The Resource is static because it is a ClassRule. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297466824 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = Review comment: The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor param. Th
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297466312 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Records modifications of + * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex ExecutionVertices}, and allows + * for checking whether a vertex was modified. + * + * Examples for modifications include: + * + * cancellation of the underlying execution + * deployment of the execution vertex + * + * + * @see DefaultScheduler + */ +public class ExecutionVertexVersioner { Review comment: Yes. Will add tests for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297466134 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>())
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297465694 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>())
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297465424 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = Review comment: Ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297465153 ## File path: flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * IT case for testing region failover strategy. + */ +public class RegionFailoverITCase extends TestLogger { Review comment: I take it as E2E tests that ensures the job can succeed even when task failures happens when using region failover strategy. IT cases do blackbox verifications so it does not verify internal states(like which tasks are re-scheduled). It just gives us confidence that Flink runtime is working well in this scenario. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297463953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -63,7 +63,7 @@ return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: Review comment: Ok. Let's make the adapted region failover strategy default in another PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297169235 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingC
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297166582 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGTest.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.PipelinedFailoverRegionBuildingTest; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategyBuildingTest; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}. + */ +public class AdaptedRestartPipelinedRegionStrategyNGTest extends TestLogger { + + /** +* Test that {@link AdaptedRestartPipelinedRegionStrategyNG} is loaded +* as failover strategy if {@link JobManagerOptions#EXECUTION_FAILOVER_STRATEGY} +* value is "region". +*/ + @Test + public void testAdaptedRestartPipelinedRegionStrategyNGLoading() throws Exception { + final ExecutionGraph eg = createExecutionGraph(new JobGraph()); + + assertTrue(eg.getFailoverStrategy() instanceof AdaptedRestartPipelinedRegionStrategyNG); Review comment: Fine. Let's omit this test as the loading behavior is actually covered by some other AdaptedRestartPipelinedRegionStrategyNG tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297165275 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGTest.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.PipelinedFailoverRegionBuildingTest; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategyBuildingTest; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}. + */ +public class AdaptedRestartPipelinedRegionStrategyNGTest extends TestLogger { + + /** +* Test that {@link AdaptedRestartPipelinedRegionStrategyNG} is loaded +* as failover strategy if {@link JobManagerOptions#EXECUTION_FAILOVER_STRATEGY} +* value is "region". +*/ + @Test + public void testAdaptedRestartPipelinedRegionStrategyNGLoading() throws Exception { + final ExecutionGraph eg = createExecutionGraph(new JobGraph()); + + assertTrue(eg.getFailoverStrategy() instanceof AdaptedRestartPipelinedRegionStrategyNG); + } + + /** +* Test whether region building works. This helps to make sure that the +* overall region building process is not breaking. +* More detailed region building verification is covered +* in {@link RestartPipelinedRegionStrategyBuildingTest} +* +* (v11) --> (v21) -+-> (v31) --> (v41) +* x +* (v12) --> (v22) -+-> (v32) --> (v42) +* +* ^ +* | +* (blocking) +* +* 4 regions. Each region has 2 pipelined connected vertices. +*/ + @Test + public void testRegionBuilding() throws Exception { Review comment: Ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297082648 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -63,7 +63,7 @@ return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: Review comment: Ok. Let's take this way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297080370 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java ## @@ -34,22 +34,4 @@ * @return set of IDs of vertices to restart */ Set getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause); - - // - // factory - // - - /** -* The factory to instantiate {@link FailoverStrategy}. -*/ - interface Factory { Review comment: Ok. Let's exclude this change for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297079565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297071613 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -576,7 +576,7 @@ else if (numSources < parallelism) { */ public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion) throws GlobalModVersionMismatch { - LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); + LOG.info("Resetting execution vertex {} to CREATED state for new execution.", getTaskNameWithSubtaskIndex()); Review comment: Ok. Let's revert this change and do it later in a separate fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296994653 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java ## @@ -34,22 +34,4 @@ * @return set of IDs of vertices to restart */ Set getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause); - - // - // factory - // - - /** -* The factory to instantiate {@link FailoverStrategy}. -*/ - interface Factory { Review comment: So far I think there is no need to create a JM shared FailoverStrategy factory. Each JM/Scheduler can create their own failoverStrategy directly. We should implement a loader util for creating failoverStrategy from configuration though. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296999324 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +/** + * This class contains scheduling logic similar to that in ExecutionGraph. + * It is used by legacy failover strategy as the strategy will do the re-scheduling work by itself. + * We extract it from ExecutionGraph to avoid affect existing scheduling logic. + */ +public class AdaptedSchedulingUtils { + + static CompletableFuture scheduleLazy( + final Set vertices, + final ExecutionGraph executionGraph) { + + final Set previousAllocations = getAllPriorAllocationIds(vertices); + + final ArrayList> schedulingFutures = new ArrayList<>(vertices.size()); + for (ExecutionVertex executionVertex : vertices) { Review comment: Above discusses for LAZY scheduling. For EAGER mode, unordered scheduling increase the chance that downstream tasks get launched earlier. In this case downstream tasks will fail to request partition and will trigger lots of partition state check RPCs to JM, which may result in JM main thread get jammed. So I think it can be beneficial to order the vertices before scheduling, both for eager or lazy mode. Will do it in the next commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296998163 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +/** + * This class contains scheduling logic similar to that in ExecutionGraph. + * It is used by legacy failover strategy as the strategy will do the re-scheduling work by itself. + * We extract it from ExecutionGraph to avoid affect existing scheduling logic. + */ +public class AdaptedSchedulingUtils { + + static CompletableFuture scheduleLazy( + final Set vertices, + final ExecutionGraph executionGraph) { + + final Set previousAllocations = getAllPriorAllocationIds(vertices); + + final ArrayList> schedulingFutures = new ArrayList<>(vertices.size()); + for (ExecutionVertex executionVertex : vertices) { + // only schedule vertex when its input constraint is satisfied + if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || + executionVertex.checkInputDependencyConstraints()) { + + final CompletableFuture schedulingVertexFuture = executionVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, + previousAllocations); + + schedulingFutures.add(schedulingVertexFuture); + } + } + + return FutureUtils.waitForAll(schedulingFutures); + } + + static CompletableFuture scheduleEager( Review comment: I think its a balance between simplicity, risk and maintainability. I'd be fine to have EG to call these methods to do scheduling. Will do this in the next commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296998163 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +/** + * This class contains scheduling logic similar to that in ExecutionGraph. + * It is used by legacy failover strategy as the strategy will do the re-scheduling work by itself. + * We extract it from ExecutionGraph to avoid affect existing scheduling logic. + */ +public class AdaptedSchedulingUtils { + + static CompletableFuture scheduleLazy( + final Set vertices, + final ExecutionGraph executionGraph) { + + final Set previousAllocations = getAllPriorAllocationIds(vertices); + + final ArrayList> schedulingFutures = new ArrayList<>(vertices.size()); + for (ExecutionVertex executionVertex : vertices) { + // only schedule vertex when its input constraint is satisfied + if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || + executionVertex.checkInputDependencyConstraints()) { + + final CompletableFuture schedulingVertexFuture = executionVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, + previousAllocations); + + schedulingFutures.add(schedulingVertexFuture); + } + } + + return FutureUtils.waitForAll(schedulingFutures); + } + + static CompletableFuture scheduleEager( Review comment: I think its a balance between simplicity, risk and maintainability. I'd be fine to have EG to call these methods to do scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296994653 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java ## @@ -34,22 +34,4 @@ * @return set of IDs of vertices to restart */ Set getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause); - - // - // factory - // - - /** -* The factory to instantiate {@link FailoverStrategy}. -*/ - interface Factory { Review comment: So far I think there is no need to create a JM shared FailoverStrategy factory. Each JM/Scheduler can create their own failoverStrategy directly. We should implement a loader util for creating failoverStrategy from ExecutionGraph though. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296993735 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296992297 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; Review comment: Sorry for missing checking it. Will fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296989612 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedSchedulingUtils.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +/** + * This class contains scheduling logic similar to that in ExecutionGraph. + * It is used by legacy failover strategy as the strategy will do the re-scheduling work by itself. + * We extract it from ExecutionGraph to avoid affect existing scheduling logic. + */ +public class AdaptedSchedulingUtils { + + static CompletableFuture scheduleLazy( + final Set vertices, + final ExecutionGraph executionGraph) { + + final Set previousAllocations = getAllPriorAllocationIds(vertices); + + final ArrayList> schedulingFutures = new ArrayList<>(vertices.size()); + for (ExecutionVertex executionVertex : vertices) { Review comment: Theoretically, it should not cause any issue as these vertices are scheduled individually. And only vertex satisfying the input constraint will be scheduled. In implementation, however, there is a bit higher chance that downstream task get launched earlier than its source tasks, and causes a resource deadlock. But even doing it topologically does not prevent this to happen. To avoid resource deadlock, users need to set `InputDependencyConstraint` to be ALL. In this case, any schedule order should be OK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296987179 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -63,7 +63,7 @@ return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: Review comment: Shall we make AdaptedRestartPipelinedRegionStrategyNG as default "region" failover strategy and add a "region_legacy" for RestartPipelinedRegionStrategy? As the legacy RestartPipelinedRegionStrategy is not applicable for batch jobs. Or adding a "region_new" for AdaptedRestartPipelinedRegionStrategyNG and let users to select it when running batch jobs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296981716 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -576,7 +576,7 @@ else if (numSources < parallelism) { */ public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion) throws GlobalModVersionMismatch { - LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); + LOG.info("Resetting execution vertex {} to CREATED state for new execution.", getTaskNameWithSubtaskIndex()); Review comment: Currently we have log tracking vertex state transitions in most cases(through Execution::transitionState), but lack log of transition to CREATED state(it is directly set). I think this could be helpful to track vertex lifecycle. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296677818 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296678129 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296678241 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296677116 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296671876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296671652 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296671692 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296671431 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296670872 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296670958 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + public final int DEFAULT_PARALLELISM = 2; + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private final TestingComponentMainThreadExecutor testMainThreadUtil = + EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final JobID jid = new JobID(); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, DEFAULT_PARALLELISM); + final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); + + final ExecutionGraph eg = createExecutionGraph( + jid, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + restartStrategy, + slotProvider); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r29990 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296665390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296665781 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296665390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296665390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r296664665 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + // skip the failover if global restart strategy suppresses restarts + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + executionGraph.failGlobal(cause); + return; + } + + // skip local failover if is in global failover + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; +