This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 23bd23b  [FLINK-13249][runtime] Fix handling of partition producer 
responses b… (#9138)
23bd23b is described below

commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Thu Jul 18 10:37:22 2019 +0200

    [FLINK-13249][runtime] Fix handling of partition producer responses b… 
(#9138)
    
    * [FLINK-13249][runtime] Fix handling of partition producer responses by 
running them with the task's executor
    
    * Review comments
---
 .../partition/PartitionProducerStateProvider.java        |  9 +++++----
 .../io/network/partition/consumer/SingleInputGate.java   |  6 +++---
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 16 ++++++++++------
 .../partition/consumer/SingleInputGateBuilder.java       |  6 +-----
 .../org/apache/flink/runtime/taskmanager/TaskTest.java   |  8 ++++----
 5 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
index 8bbdaa5..5785095 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.types.Either;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Request execution state of partition producer, the response accepts state 
check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
         * @param intermediateDataSetId ID of the parent intermediate data set.
         * @param resultPartitionId ID of the result partition to check. This
         * identifies the producing execution and partition.
-        * @return a future with response handle.
+        * @param responseConsumer consumer for the response handle.
         */
-       CompletableFuture<? extends ResponseHandle> 
requestPartitionProducerState(
+       void requestPartitionProducerState(
                IntermediateDataSetID intermediateDataSetId,
-               ResultPartitionID resultPartitionId);
+               ResultPartitionID resultPartitionId,
+               Consumer<? super ResponseHandle> responseConsumer);
 
        /**
         * Result of state query, accepts state check callbacks.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index bd75262..534078d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate {
        void triggerPartitionStateCheck(ResultPartitionID partitionId) {
                partitionProducerStateProvider.requestPartitionProducerState(
                        consumedResultId,
-                       partitionId)
-                       .thenAccept(responseHandle -> {
+                       partitionId,
+                       ((PartitionProducerStateProvider.ResponseHandle 
responseHandle) -> {
                                boolean isProducingState = new 
RemoteChannelStateChecker(partitionId, owningTaskName)
                                        
.isProducerReadyOrAbortConsumption(responseHandle);
                                if (isProducingState) {
@@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate {
                                                
responseHandle.failConsumption(t);
                                        }
                                }
-                       });
+                       }));
        }
 
        private void queueChannel(InputChannel channel) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d4e1d8a..12049f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -99,6 +99,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1080,18 +1081,21 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
        // 
------------------------------------------------------------------------
 
        @Override
-       public CompletableFuture<PartitionProducerStateResponseHandle> 
requestPartitionProducerState(
+       public void requestPartitionProducerState(
                        final IntermediateDataSetID intermediateDataSetId,
-                       final ResultPartitionID resultPartitionId) {
+                       final ResultPartitionID resultPartitionId,
+                       Consumer<? super ResponseHandle> responseConsumer) {
+
                final CompletableFuture<ExecutionState> futurePartitionState =
                        
partitionProducerStateChecker.requestPartitionProducerState(
                                jobId,
                                intermediateDataSetId,
                                resultPartitionId);
-               final CompletableFuture<PartitionProducerStateResponseHandle> 
result =
-                       
futurePartitionState.handleAsync(PartitionProducerStateResponseHandle::new, 
executor);
-               FutureUtils.assertNoException(result);
-               return result;
+
+               FutureUtils.assertNoException(
+                       futurePartitionState
+                               
.handle(PartitionProducerStateResponseHandle::new)
+                               .thenAcceptAsync(responseConsumer, executor));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 956bad9..944cc07 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -21,23 +21,19 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
-import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Utility class to encapsulate the logic of building a {@link 
SingleInputGate} instance.
  */
 public class SingleInputGateBuilder {
 
-       private static final CompletableFuture<ResponseHandle> 
NO_OP_PRODUCER_CHECKER_RESULT = new CompletableFuture<>();
-
-       public static final PartitionProducerStateProvider 
NO_OP_PRODUCER_CHECKER = (dsid, id) -> NO_OP_PRODUCER_CHECKER_RESULT;
+       public static final PartitionProducerStateProvider 
NO_OP_PRODUCER_CHECKER = (dsid, id, consumer) -> {};
 
        private final IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ee78963..5879f52 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -656,7 +656,7 @@ public class TaskTest extends TestLogger {
                        final CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                        
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
 
-                       task.requestPartitionProducerState(resultId, 
partitionId).thenAccept(checkResult ->
+                       task.requestPartitionProducerState(resultId, 
partitionId, checkResult ->
                                
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult),
 is(false))
                        );
 
@@ -680,7 +680,7 @@ public class TaskTest extends TestLogger {
                        final CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                        
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
 
-                       task.requestPartitionProducerState(resultId, 
partitionId).thenAccept(checkResult ->
+                       task.requestPartitionProducerState(resultId, 
partitionId, checkResult ->
                                
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult),
 is(false))
                        );
 
@@ -711,7 +711,7 @@ public class TaskTest extends TestLogger {
                                CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                                
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
 
-                               task.requestPartitionProducerState(resultId, 
partitionId).thenAccept(checkResult -> {
+                               task.requestPartitionProducerState(resultId, 
partitionId, checkResult -> {
                                        if 
(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                                                callCount.incrementAndGet();
                                        }
@@ -749,7 +749,7 @@ public class TaskTest extends TestLogger {
                                CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                                
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
 
-                               task.requestPartitionProducerState(resultId, 
partitionId).thenAccept(checkResult -> {
+                               task.requestPartitionProducerState(resultId, 
partitionId, checkResult -> {
                                        if 
(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                                                callCount.incrementAndGet();
                                        }

Reply via email to