This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b62db93  [FLINK-12203] Refactor ResultPartitionManager to break tie 
with Task
b62db93 is described below

commit b62db93bf63cb3bb34dd03d611a779d9e3fc61ac
Author: Andrey Zagrebin <azagre...@gmail.com>
AuthorDate: Thu Apr 18 15:26:24 2019 +0200

    [FLINK-12203] Refactor ResultPartitionManager to break tie with Task
    
    At the moment, we have ResultPartitionManager.releasePartitionsProducedBy 
which uses indexing by task in network environment. These methods are 
eventually used only by Task which already knows its partitions so Task can use 
ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use 
NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also 
requires that JM Execution sends produced partition ids instead of just 
ExecutionAttemptID.
    
    This closes #8210.
---
 .../flink/runtime/executiongraph/Execution.java    | 20 ++++++---
 .../runtime/io/network/NetworkEnvironment.java     | 13 ++++++
 .../io/network/partition/ResultPartition.java      |  2 +-
 .../network/partition/ResultPartitionManager.java  | 50 ++++++----------------
 .../jobmanager/slots/TaskManagerGateway.java       |  8 ++--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  6 ++-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  7 ++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  8 ++--
 .../utils/SimpleAckingTaskManagerGateway.java      |  5 ++-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  4 +-
 10 files changed, 65 insertions(+), 58 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 63f3125..e413619 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -688,7 +689,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        else if (current == FINISHED || current == FAILED) {
                                // nothing to do any more. finished failed 
before it could be cancelled.
                                // in any case, the task is removed from the 
TaskManager already
-                               sendFailIntermediateResultPartitionsRpcCall();
+                               
sendReleaseIntermediateResultPartitionsRpcCall();
 
                                return;
                        }
@@ -721,7 +722,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                break;
                        case FINISHED:
                        case FAILED:
-                               sendFailIntermediateResultPartitionsRpcCall();
+                               
sendReleaseIntermediateResultPartitionsRpcCall();
                                break;
                        case CANCELED:
                                break;
@@ -1202,14 +1203,23 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       private void sendFailIntermediateResultPartitionsRpcCall() {
+       private void sendReleaseIntermediateResultPartitionsRpcCall() {
+               LOG.info("Discarding the results produced by task execution 
{}.", attemptId);
                final LogicalSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
 
-                       // TODO For some tests this could be a problem when 
querying too early if all resources were released
-                       taskManagerGateway.failPartition(attemptId);
+                       Collection<IntermediateResultPartition> partitions = 
vertex.getProducedPartitions().values();
+                       Collection<ResultPartitionID> partitionIds = new 
ArrayList<>(partitions.size());
+                       for (IntermediateResultPartition partition : 
partitions) {
+                               partitionIds.add(new 
ResultPartitionID(partition.getPartitionId(), attemptId));
+                       }
+
+                       if (!partitionIds.isEmpty()) {
+                               // TODO For some tests this could be a problem 
when querying too early if all resources were released
+                               
taskManagerGateway.releasePartitions(partitionIds);
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 98c61a4..0ee8595 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -207,6 +209,17 @@ public class NetworkEnvironment {
                }
        }
 
+       /**
+        * Batch release intermediate result partitions.
+        *
+        * @param partitionIds partition ids to release
+        */
+       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+               for (ResultPartitionID partitionId : partitionIds) {
+                       resultPartitionManager.releasePartition(partitionId, 
null);
+               }
+       }
+
        public void start() throws IOException {
                synchronized (lock) {
                        Preconditions.checkState(!isShutdown, "The 
NetworkEnvironment has already been shut down.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index e0e7829..fb73a70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -334,7 +334,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
        }
 
        public void fail(@Nullable Throwable throwable) {
-               
partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), 
throwable);
+               partitionManager.releasePartition(partitionId, throwable);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 09a62ed..8d96e2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -18,17 +18,11 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import 
org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable;
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Table;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -41,19 +35,15 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartitionManager.class);
 
-       public final Table<ExecutionAttemptID, IntermediateResultPartitionID, 
ResultPartition>
-                       registeredPartitions = HashBasedTable.create();
+       private final Map<ResultPartitionID, ResultPartition> 
registeredPartitions = new HashMap<>(16);
 
        private boolean isShutdown;
 
-       public void registerResultPartition(ResultPartition partition) throws 
IOException {
+       public void registerResultPartition(ResultPartition partition) {
                synchronized (registeredPartitions) {
                        checkState(!isShutdown, "Result partition manager 
already shut down.");
 
-                       ResultPartitionID partitionId = 
partition.getPartitionId();
-
-                       ResultPartition previous = registeredPartitions.put(
-                                       partitionId.getProducerId(), 
partitionId.getPartitionId(), partition);
+                       ResultPartition previous = 
registeredPartitions.put(partition.getPartitionId(), partition);
 
                        if (previous != null) {
                                throw new IllegalStateException("Result 
partition already registered.");
@@ -70,8 +60,7 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
                        BufferAvailabilityListener availabilityListener) throws 
IOException {
 
                synchronized (registeredPartitions) {
-                       final ResultPartition partition = 
registeredPartitions.get(partitionId.getProducerId(),
-                                       partitionId.getPartitionId());
+                       final ResultPartition partition = 
registeredPartitions.get(partitionId);
 
                        if (partition == null) {
                                throw new 
PartitionNotFoundException(partitionId);
@@ -83,26 +72,14 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
                }
        }
 
-       public void releasePartitionsProducedBy(ExecutionAttemptID executionId) 
{
-               releasePartitionsProducedBy(executionId, null);
-       }
-
-       public void releasePartitionsProducedBy(ExecutionAttemptID executionId, 
Throwable cause) {
+       public void releasePartition(ResultPartitionID partitionId, Throwable 
cause) {
                synchronized (registeredPartitions) {
-                       final Map<IntermediateResultPartitionID, 
ResultPartition> partitions =
-                                       registeredPartitions.row(executionId);
-
-                       for (ResultPartition partition : partitions.values()) {
-                               partition.release(cause);
+                       ResultPartition resultPartition = 
registeredPartitions.remove(partitionId);
+                       if (resultPartition != null) {
+                               resultPartition.release(cause);
+                               LOG.debug("Released partition {} produced by 
{}.",
+                                       partitionId.getPartitionId(), 
partitionId.getProducerId());
                        }
-
-                       for (IntermediateResultPartitionID partitionId : 
ImmutableList
-                                       .copyOf(partitions.keySet())) {
-
-                               registeredPartitions.remove(executionId, 
partitionId);
-                       }
-
-                       LOG.debug("Released all partitions produced by {}.", 
executionId);
                }
        }
 
@@ -134,10 +111,7 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
                LOG.debug("Received consume notification from {}.", partition);
 
                synchronized (registeredPartitions) {
-                       ResultPartitionID partitionId = 
partition.getPartitionId();
-
-                       previous = 
registeredPartitions.remove(partitionId.getProducerId(),
-                                       partitionId.getPartitionId());
+                       previous = 
registeredPartitions.remove(partition.getPartitionId());
                }
 
                // Release the partition if it was successfully removed
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 6922b05..593a853 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -25,10 +25,12 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -98,11 +100,11 @@ public interface TaskManagerGateway {
                Time timeout);
 
        /**
-        * Fail all intermediate result partitions of the given task.
+        * Batch release intermediate result partitions.
         *
-        * @param executionAttemptID identifying the task
+        * @param partitionIds partition ids to release
         */
-       void failPartition(ExecutionAttemptID executionAttemptID);
+       void releasePartitions(Collection<ResultPartitionID> partitionIds);
 
        /**
         * Notify the given task about a completed checkpoint.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 064eef5..1fb2d49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -25,12 +25,14 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -86,8 +88,8 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void failPartition(ExecutionAttemptID executionAttemptID) {
-               taskExecutorGateway.failPartition(executionAttemptID);
+       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+               taskExecutorGateway.releasePartitions(partitionIds);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b9ad5b..b35d65e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -649,11 +650,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        @Override
-       public void failPartition(ExecutionAttemptID executionAttemptID) {
-               log.info("Discarding the results produced by task execution 
{}.", executionAttemptID);
-
+       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
                try {
-                       
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
+                       networkEnvironment.releasePartitions(partitionIds);
                } catch (Throwable t) {
                        // TODO: Do we still need this catch branch?
                        onFatalError(t);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 728087a..8a653df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.SerializableOptional;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -99,11 +101,11 @@ public interface TaskExecutorGateway extends RpcGateway {
                @RpcTimeout Time timeout);
 
        /**
-        * Fail all intermediate result partitions of the given task.
+        * Batch release intermediate result partitions.
         *
-        * @param executionAttemptID identifying the task
+        * @param partitionIds partition ids to release
         */
-       void failPartition(ExecutionAttemptID executionAttemptID);
+       void releasePartitions(Collection<ResultPartitionID> partitionIds);
 
        /**
         * Trigger the checkpoint for the given task. The checkpoint is 
identified by the checkpoint ID
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 22d5df0..dba0e7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -26,10 +26,12 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
@@ -95,7 +97,8 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void failPartition(ExecutionAttemptID executionAttemptID) {}
+       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+       }
 
        @Override
        public void notifyCheckpointComplete(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 789956f..aca0bb2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
@@ -37,6 +38,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -123,7 +125,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
        }
 
        @Override
-       public void failPartition(ExecutionAttemptID executionAttemptID) {
+       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
                // noop
        }
 

Reply via email to