This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9c23a0b86121d6361df403a05f75ba4b3902735
Author: Gary Yao <g...@apache.org>
AuthorDate: Tue Apr 21 15:59:27 2020 +0200

    [FLINK-17180][runtime] Use SchedulingPipelinedRegion in 
RestartPipelinedRegionFailoverStrategy
    
    Avoid re-computing pipelined regions in the
    RestartPipelinedRegionFailoverStrategy using the
    PipelinedRegionComputeUtil. Instead, rely on the pipelined regions
    provided by the Topology.
    
    This closes #11857.
---
 .../failover/flip1/FailoverRegion.java             |  68 -----------
 .../RestartPipelinedRegionFailoverStrategy.java    |  63 +++--------
 ...ipelinedRegionFailoverStrategyBuildingTest.java | 125 +++++++++++----------
 3 files changed, 78 insertions(+), 178 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
deleted file mode 100644
index d1efb6f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.failover.flip1;
-
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * FailoverRegion is a subset of all the vertices in the job topology.
- */
-public class FailoverRegion {
-
-       /** All vertex IDs in this region. */
-       private final Set<ExecutionVertexID> executionVertexIDs;
-
-       /** All vertices in this region. */
-       private final Set<? extends SchedulingExecutionVertex> 
executionVertices;
-
-       /**
-        * Creates a new failover region containing a set of vertices.
-        *
-        * @param executionVertices to be contained in this region
-        */
-       public FailoverRegion(Set<? extends SchedulingExecutionVertex> 
executionVertices) {
-               this.executionVertices = checkNotNull(executionVertices);
-               this.executionVertexIDs = new HashSet<>();
-               executionVertices.forEach(v -> 
this.executionVertexIDs.add(v.getId()));
-       }
-
-       /**
-        * Returns IDs of all vertices in this region.
-        *
-        * @return IDs of all vertices in this region
-        */
-       public Set<ExecutionVertexID> getAllExecutionVertexIDs() {
-               return executionVertexIDs;
-       }
-
-       /**
-        * Returns all vertices in this region.
-        *
-        * @return all vertices in this region
-        */
-       public Set<? extends SchedulingExecutionVertex> 
getAllExecutionVertices() {
-               return executionVertices;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index 3c158f0..eb8d06b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionException;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.util.ExceptionUtils;
@@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -53,12 +52,6 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
        /** The topology containing info about all the vertices and result 
partitions. */
        private final SchedulingTopology topology;
 
-       /** All failover regions. */
-       private final Set<FailoverRegion> regions;
-
-       /** Maps execution vertex id to failover region. */
-       private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
-
        /** The checker helps to query result partition availability. */
        private final RegionFailoverResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker;
 
@@ -84,34 +77,8 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
                ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
                this.topology = checkNotNull(topology);
-               this.regions = Collections.newSetFromMap(new 
IdentityHashMap<>());
-               this.vertexToRegionMap = new HashMap<>();
                this.resultPartitionAvailabilityChecker = new 
RegionFailoverResultPartitionAvailabilityChecker(
                        resultPartitionAvailabilityChecker);
-
-               // build regions based on the given topology
-               LOG.info("Start building failover regions.");
-               buildFailoverRegions();
-       }
-       // 
------------------------------------------------------------------------
-       //  region building
-       // 
------------------------------------------------------------------------
-
-       private void buildFailoverRegions() {
-               final Set<Set<SchedulingExecutionVertex>> distinctRegions =
-                       
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
-
-               // creating all the failover regions and register them
-               for (Set<SchedulingExecutionVertex> regionVertices : 
distinctRegions) {
-                       LOG.debug("Creating a failover region with {} 
vertices.", regionVertices.size());
-                       final FailoverRegion failoverRegion = new 
FailoverRegion(regionVertices);
-                       regions.add(failoverRegion);
-                       for (SchedulingExecutionVertex vertex : regionVertices) 
{
-                               vertexToRegionMap.put(vertex.getId(), 
failoverRegion);
-                       }
-               }
-
-               LOG.info("Created {} failover regions.", regions.size());
        }
 
 
@@ -136,7 +103,7 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
                LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
 
-               final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
+               final SchedulingPipelinedRegion failedRegion = 
topology.getPipelinedRegionOfVertex(executionVertexId);
                if (failedRegion == null) {
                        // TODO: show the task name in the log
                        throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
@@ -153,8 +120,8 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
 
                // calculate the tasks to restart based on the result of 
regions to restart
                Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
-               for (FailoverRegion region : getRegionsToRestart(failedRegion)) 
{
-                       
tasksToRestart.addAll(region.getAllExecutionVertexIDs());
+               for (SchedulingPipelinedRegion region : 
getRegionsToRestart(failedRegion)) {
+                       region.getVertices().forEach(vertex -> 
tasksToRestart.add(vertex.getId()));
                }
 
                // the previous failed partition will be recovered. remove its 
failed state from the checker
@@ -175,25 +142,25 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
         *    the region containing the partition producer task is involved
         * 3. If a region is involved, all of its consumer regions are involved
         */
-       private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
failedRegion) {
-               Set<FailoverRegion> regionsToRestart = 
Collections.newSetFromMap(new IdentityHashMap<>());
-               Set<FailoverRegion> visitedRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
+       private Set<SchedulingPipelinedRegion> 
getRegionsToRestart(SchedulingPipelinedRegion failedRegion) {
+               Set<SchedulingPipelinedRegion> regionsToRestart = 
Collections.newSetFromMap(new IdentityHashMap<>());
+               Set<SchedulingPipelinedRegion> visitedRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
 
                // start from the failed region to visit all involved regions
-               Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+               Queue<SchedulingPipelinedRegion> regionsToVisit = new 
ArrayDeque<>();
                visitedRegions.add(failedRegion);
                regionsToVisit.add(failedRegion);
                while (!regionsToVisit.isEmpty()) {
-                       FailoverRegion regionToRestart = regionsToVisit.poll();
+                       SchedulingPipelinedRegion regionToRestart = 
regionsToVisit.poll();
 
                        // an involved region should be restarted
                        regionsToRestart.add(regionToRestart);
 
                        // if a needed input result partition is not available, 
its producer region is involved
-                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getVertices()) {
                                for (SchedulingResultPartition 
consumedPartition : vertex.getConsumedResults()) {
                                        if 
(!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) {
-                                               FailoverRegion producerRegion = 
vertexToRegionMap.get(consumedPartition.getProducer().getId());
+                                               SchedulingPipelinedRegion 
producerRegion = 
topology.getPipelinedRegionOfVertex(consumedPartition.getProducer().getId());
                                                if 
(!visitedRegions.contains(producerRegion)) {
                                                        
visitedRegions.add(producerRegion);
                                                        
regionsToVisit.add(producerRegion);
@@ -203,10 +170,10 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
                        }
 
                        // all consumer regions of an involved region should be 
involved
-                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getVertices()) {
                                for (SchedulingResultPartition 
producedPartition : vertex.getProducedResults()) {
                                        for (SchedulingExecutionVertex 
consumerVertex : producedPartition.getConsumers()) {
-                                               FailoverRegion consumerRegion = 
vertexToRegionMap.get(consumerVertex.getId());
+                                               SchedulingPipelinedRegion 
consumerRegion = topology.getPipelinedRegionOfVertex(consumerVertex.getId());
                                                if 
(!visitedRegions.contains(consumerRegion)) {
                                                        
visitedRegions.add(consumerRegion);
                                                        
regionsToVisit.add(consumerRegion);
@@ -229,8 +196,8 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
         * @return the failover region that contains the given execution vertex
         */
        @VisibleForTesting
-       public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
-               return vertexToRegionMap.get(vertexID);
+       public SchedulingPipelinedRegion getFailoverRegion(ExecutionVertexID 
vertexID) {
+               return topology.getPipelinedRegionOfVertex(vertexID);
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java
index a9cdcff..d3fa17e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
 import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
 import org.apache.flink.util.TestLogger;
@@ -55,9 +56,9 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
-               FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
-               FailoverRegion r3 = strategy.getFailoverRegion(v3.getId());
+               SchedulingPipelinedRegion r1 = 
strategy.getFailoverRegion(v1.getId());
+               SchedulingPipelinedRegion r2 = 
strategy.getFailoverRegion(v2.getId());
+               SchedulingPipelinedRegion r3 = 
strategy.getFailoverRegion(v3.getId());
 
                assertDistinctRegions(r1, r2, r3);
        }
@@ -91,12 +92,12 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion ra3 = strategy.getFailoverRegion(va3.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
-               FailoverRegion rb3 = strategy.getFailoverRegion(vb3.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion ra3 = 
strategy.getFailoverRegion(va3.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion rb3 = 
strategy.getFailoverRegion(vb3.getId());
 
                assertSameRegion(ra1, rb1);
                assertSameRegion(ra2, rb2);
@@ -138,12 +139,12 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
-               FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId());
-               FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion rc1 = 
strategy.getFailoverRegion(vc1.getId());
+               SchedulingPipelinedRegion rc2 = 
strategy.getFailoverRegion(vc2.getId());
 
                assertSameRegion(ra1, ra2, rb1, rb2, rc1, rc2);
        }
@@ -184,13 +185,13 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
-               FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
-               FailoverRegion r3 = strategy.getFailoverRegion(v3.getId());
-               FailoverRegion r4 = strategy.getFailoverRegion(v4.getId());
-               FailoverRegion r5 = strategy.getFailoverRegion(v5.getId());
-               FailoverRegion r6 = strategy.getFailoverRegion(v6.getId());
-               FailoverRegion r7 = strategy.getFailoverRegion(v7.getId());
+               SchedulingPipelinedRegion r1 = 
strategy.getFailoverRegion(v1.getId());
+               SchedulingPipelinedRegion r2 = 
strategy.getFailoverRegion(v2.getId());
+               SchedulingPipelinedRegion r3 = 
strategy.getFailoverRegion(v3.getId());
+               SchedulingPipelinedRegion r4 = 
strategy.getFailoverRegion(v4.getId());
+               SchedulingPipelinedRegion r5 = 
strategy.getFailoverRegion(v5.getId());
+               SchedulingPipelinedRegion r6 = 
strategy.getFailoverRegion(v6.getId());
+               SchedulingPipelinedRegion r7 = 
strategy.getFailoverRegion(v7.getId());
 
                assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
        }
@@ -231,13 +232,13 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
-               FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
-               FailoverRegion r3 = strategy.getFailoverRegion(v3.getId());
-               FailoverRegion r4 = strategy.getFailoverRegion(v4.getId());
-               FailoverRegion r5 = strategy.getFailoverRegion(v5.getId());
-               FailoverRegion r6 = strategy.getFailoverRegion(v6.getId());
-               FailoverRegion r7 = strategy.getFailoverRegion(v7.getId());
+               SchedulingPipelinedRegion r1 = 
strategy.getFailoverRegion(v1.getId());
+               SchedulingPipelinedRegion r2 = 
strategy.getFailoverRegion(v2.getId());
+               SchedulingPipelinedRegion r3 = 
strategy.getFailoverRegion(v3.getId());
+               SchedulingPipelinedRegion r4 = 
strategy.getFailoverRegion(v4.getId());
+               SchedulingPipelinedRegion r5 = 
strategy.getFailoverRegion(v5.getId());
+               SchedulingPipelinedRegion r6 = 
strategy.getFailoverRegion(v6.getId());
+               SchedulingPipelinedRegion r7 = 
strategy.getFailoverRegion(v7.getId());
 
                assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
        }
@@ -275,12 +276,12 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
-               FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId());
-               FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion rc1 = 
strategy.getFailoverRegion(vc1.getId());
+               SchedulingPipelinedRegion rc2 = 
strategy.getFailoverRegion(vc2.getId());
 
                assertSameRegion(ra1, ra2, rb1, rb2);
 
@@ -322,12 +323,12 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
-               FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId());
-               FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion rc1 = 
strategy.getFailoverRegion(vc1.getId());
+               SchedulingPipelinedRegion rc2 = 
strategy.getFailoverRegion(vc2.getId());
 
                assertSameRegion(ra1, ra2, rb1, rb2);
 
@@ -374,13 +375,13 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
-               FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
-               FailoverRegion r3 = strategy.getFailoverRegion(v3.getId());
-               FailoverRegion r4 = strategy.getFailoverRegion(v4.getId());
-               FailoverRegion r5 = strategy.getFailoverRegion(v5.getId());
-               FailoverRegion r6 = strategy.getFailoverRegion(v6.getId());
-               FailoverRegion r7 = strategy.getFailoverRegion(v7.getId());
+               SchedulingPipelinedRegion r1 = 
strategy.getFailoverRegion(v1.getId());
+               SchedulingPipelinedRegion r2 = 
strategy.getFailoverRegion(v2.getId());
+               SchedulingPipelinedRegion r3 = 
strategy.getFailoverRegion(v3.getId());
+               SchedulingPipelinedRegion r4 = 
strategy.getFailoverRegion(v4.getId());
+               SchedulingPipelinedRegion r5 = 
strategy.getFailoverRegion(v5.getId());
+               SchedulingPipelinedRegion r6 = 
strategy.getFailoverRegion(v6.getId());
+               SchedulingPipelinedRegion r7 = 
strategy.getFailoverRegion(v7.getId());
 
                assertSameRegion(r1, r2, r5);
                assertSameRegion(r3, r4, r6);
@@ -418,10 +419,10 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
-               FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
-               FailoverRegion r3 = strategy.getFailoverRegion(v3.getId());
-               FailoverRegion r4 = strategy.getFailoverRegion(v4.getId());
+               SchedulingPipelinedRegion r1 = 
strategy.getFailoverRegion(v1.getId());
+               SchedulingPipelinedRegion r2 = 
strategy.getFailoverRegion(v2.getId());
+               SchedulingPipelinedRegion r3 = 
strategy.getFailoverRegion(v3.getId());
+               SchedulingPipelinedRegion r4 = 
strategy.getFailoverRegion(v4.getId());
 
                assertSameRegion(r1, r2, r3, r4);
        }
@@ -459,10 +460,10 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
 
                assertSameRegion(ra1, ra2, rb1, rb2);
        }
@@ -494,10 +495,10 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
 
                RestartPipelinedRegionFailoverStrategy strategy = new 
RestartPipelinedRegionFailoverStrategy(topology);
 
-               FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
-               FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
-               FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId());
-               FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId());
+               SchedulingPipelinedRegion ra1 = 
strategy.getFailoverRegion(va1.getId());
+               SchedulingPipelinedRegion ra2 = 
strategy.getFailoverRegion(va2.getId());
+               SchedulingPipelinedRegion rb1 = 
strategy.getFailoverRegion(vb1.getId());
+               SchedulingPipelinedRegion rb2 = 
strategy.getFailoverRegion(vb2.getId());
 
                assertSameRegion(ra1, ra2, rb1, rb2);
        }
@@ -506,7 +507,7 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
        //  utilities
        // 
------------------------------------------------------------------------
 
-       public static void assertSameRegion(FailoverRegion ...regions) {
+       public static void assertSameRegion(SchedulingPipelinedRegion 
...regions) {
                checkNotNull(regions);
                for (int i = 0; i < regions.length; i++) {
                        for (int j = i + 1; i < regions.length; i++) {
@@ -515,7 +516,7 @@ public class 
RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogg
                }
        }
 
-       public static void assertDistinctRegions(FailoverRegion ...regions) {
+       public static void assertDistinctRegions(SchedulingPipelinedRegion 
...regions) {
                checkNotNull(regions);
                for (int i = 0; i < regions.length; i++) {
                        for (int j = i + 1; j < regions.length; j++) {

Reply via email to