[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288287665
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.types.Either;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Handles the result of {@link PartitionStateChecker}.
+ *
+ * The method {@code isProducerConsumerReadyOrAbortConsumption} determines 
whether the partition producer is
+ * in a producing state and consumer is ready for consumption.
+ */
+public class RemoteChannelStateChecker {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteChannelStateChecker.class);
+
+   private final ResultPartitionID resultPartitionId;
+
+   private final String taskNameWithSubtask;
+
+   public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, 
String taskNameWithSubtask) {
+   this.resultPartitionId = resultPartitionId;
+   this.taskNameWithSubtask = taskNameWithSubtask;
+   }
+
+   public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult 
checkResult) {
+   Either result = 
checkResult.getProducerExecutionState();
+   if (result.isLeft() || result.right() instanceof 
TimeoutException) {
+   boolean isProducerConsumerReady = 
isProducerConsumerReady(checkResult);
+   if (isProducerConsumerReady) {
+   return true;
+   } else {
+   
abortConsumptionOrIgnoreCheckResult(checkResult);
+   }
+   } else {
+   handleFailedCheckResult(checkResult);
+   }
+   return false;
+   }
+
+   private boolean isProducerConsumerReady(CheckResult checkResult) {
+   ExecutionState consumerState = 
checkResult.getConsumerExecutionState();
+   Either result = 
checkResult.getProducerExecutionState();
+   ExecutionState producerState = result.isLeft() ? result.left() 
: ExecutionState.RUNNING;
 
 Review comment:
   This logic is duplicated here and in `abortConsumptionOrIgnoreCheckResult`. 
Would be good to deduplicate it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288290783
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.types.Either;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Handles the result of {@link PartitionStateChecker}.
+ *
+ * The method {@code isProducerConsumerReadyOrAbortConsumption} determines 
whether the partition producer is
+ * in a producing state and consumer is ready for consumption.
+ */
+public class RemoteChannelStateChecker {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteChannelStateChecker.class);
+
+   private final ResultPartitionID resultPartitionId;
+
+   private final String taskNameWithSubtask;
+
+   public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, 
String taskNameWithSubtask) {
+   this.resultPartitionId = resultPartitionId;
+   this.taskNameWithSubtask = taskNameWithSubtask;
+   }
+
+   public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult 
checkResult) {
 
 Review comment:
   Not super important and out of scope for this PR, but this method either 
triggers some action (fail or cancel) or returns a decision (trigger new 
partition check or not). I think it would be more symmetric if this class would 
not trigger any action but only return a decision what to do:
   ```
   enum Action {
 FAIL(Throwable cause),
 CANCEL(String msg),
 TRIGGER_PARTITION_CHECK,
 NOOP
   }
   ```
   Then the caller would be responsible for making the action. That way this 
class would only need access to `checkResult.getProducerExecutionState()` and 
not `checkResult` itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288283528
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateID.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.io.Serializable;
+
+/**
+ * Runtime identifier of a consumed {@link 
org.apache.flink.runtime.executiongraph.IntermediateResult}.
+ *
+ * In runtime the {@link 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID} is not enough to 
uniquely
 
 Review comment:
   nit: At runtime


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288288062
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.types.Either;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Handles the result of {@link PartitionStateChecker}.
+ *
+ * The method {@code isProducerConsumerReadyOrAbortConsumption} determines 
whether the partition producer is
+ * in a producing state and consumer is ready for consumption.
+ */
+public class RemoteChannelStateChecker {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteChannelStateChecker.class);
+
+   private final ResultPartitionID resultPartitionId;
+
+   private final String taskNameWithSubtask;
+
+   public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, 
String taskNameWithSubtask) {
+   this.resultPartitionId = resultPartitionId;
+   this.taskNameWithSubtask = taskNameWithSubtask;
+   }
+
+   public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult 
checkResult) {
+   Either result = 
checkResult.getProducerExecutionState();
+   if (result.isLeft() || result.right() instanceof 
TimeoutException) {
+   boolean isProducerConsumerReady = 
isProducerConsumerReady(checkResult);
+   if (isProducerConsumerReady) {
+   return true;
+   } else {
+   
abortConsumptionOrIgnoreCheckResult(checkResult);
+   }
+   } else {
+   handleFailedCheckResult(checkResult);
+   }
+   return false;
+   }
+
+   private boolean isProducerConsumerReady(CheckResult checkResult) {
+   ExecutionState consumerState = 
checkResult.getConsumerExecutionState();
+   Either result = 
checkResult.getProducerExecutionState();
+   ExecutionState producerState = result.isLeft() ? result.left() 
: ExecutionState.RUNNING;
+   return consumerState == ExecutionState.RUNNING &&
+   (producerState == ExecutionState.SCHEDULED ||
+   producerState == ExecutionState.DEPLOYING ||
+   producerState == ExecutionState.RUNNING ||
+   producerState == ExecutionState.FINISHED);
+   }
+
+   private void abortConsumptionOrIgnoreCheckResult(CheckResult 
checkResult) {
+   ExecutionState consumerState = 
checkResult.getConsumerExecutionState();
+   Either result = 
checkResult.getProducerExecutionState();
+   ExecutionState producerState = result.isLeft() ? result.left() 
: ExecutionState.RUNNING;
+   if (consumerState == ExecutionState.RUNNING) {
+   if (producerState == ExecutionState.CANCELING ||
+   producerState == ExecutionState.CANCELED ||
+   producerState == ExecutionState.FAILED) {
+
+   // The producing execution has been canceled or 
failed. We
+   // don't need to re-trigger the request since 
it cannot
+   // succeed.
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Cancelling task {} after the 
producer 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288283226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionStateChecker.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.types.Either;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Queries execution states of partition producer and consumer, accepts 
actions of state check result.
+ */
+public interface PartitionStateChecker {
+   /**
+* Triggers check of the producer and consumer execution states.
+*
+* @param intermediateDataSetId ID of the parent intermediate data set.
+* @param resultPartitionId ID of the result partition to check. This
+* identifies the producing execution and partition.
+* @return a future of check result.
+*/
+   CompletableFuture triggerPartitionProducerCheck(
+   IntermediateDataSetID intermediateDataSetId,
+   ResultPartitionID resultPartitionId);
+
+   /**
+* Result of partition state check, accepts check callbacks.
+*/
+   interface CheckResult {
+   ExecutionState getConsumerExecutionState();
 
 Review comment:
   Do we really need the consumer's execution state here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-28 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r288287374
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.types.Either;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Handles the result of {@link PartitionStateChecker}.
+ *
+ * The method {@code isProducerConsumerReadyOrAbortConsumption} determines 
whether the partition producer is
+ * in a producing state and consumer is ready for consumption.
+ */
+public class RemoteChannelStateChecker {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteChannelStateChecker.class);
+
+   private final ResultPartitionID resultPartitionId;
+
+   private final String taskNameWithSubtask;
+
+   public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, 
String taskNameWithSubtask) {
+   this.resultPartitionId = resultPartitionId;
+   this.taskNameWithSubtask = taskNameWithSubtask;
+   }
+
+   public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult 
checkResult) {
+   Either result = 
checkResult.getProducerExecutionState();
+   if (result.isLeft() || result.right() instanceof 
TimeoutException) {
+   boolean isProducerConsumerReady = 
isProducerConsumerReady(checkResult);
+   if (isProducerConsumerReady) {
+   return true;
+   } else {
+   
abortConsumptionOrIgnoreCheckResult(checkResult);
+   }
+   } else {
+   handleFailedCheckResult(checkResult);
+   }
+   return false;
+   }
+
+   private boolean isProducerConsumerReady(CheckResult checkResult) {
+   ExecutionState consumerState = 
checkResult.getConsumerExecutionState();
+   Either result = 
checkResult.getProducerExecutionState();
+   ExecutionState producerState = result.isLeft() ? result.left() 
: ExecutionState.RUNNING;
+   return consumerState == ExecutionState.RUNNING &&
 
 Review comment:
   Can we check the `consumerState` outside of the `RemoteChannelStateChecker`? 
If the consumer (==this) is not running, then we should simply ignore the 
update message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286541806
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
-
-   if (singleInputGate != null) {
-   // Run asynchronously because it might 
be blocking
-   getRpcService().execute(
-   () -> {
-   try {
-   
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-   } catch (IOException | 
InterruptedException e) {
-   
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-   try {
-   
task.failExternally(e);
-   } catch 
(RuntimeException re) {
-   // 
TODO: Check whether we need this or make exception in failExtenally checked
-   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-   }
-   }
-   });
-   } else {
-   return 
FutureUtils.completedExceptionally(
-   new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
-   " for task " + 
executionAttemptID + " was found."));
-   }
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (Throwable t) {
 
 Review comment:
   I think we should not catch `Throwable` here. If an unchecked exception 
occurs, it should simply bubble up and cause the component to fail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286540299
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -441,6 +444,7 @@ public void close() throws IOException {
finally {
isReleased = true;
 
 Review comment:
   I think we could remove this field because we have now the `closeFuture`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286563093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1221,53 +1214,46 @@ public void run() {
 */
@VisibleForTesting
void onPartitionStateUpdate(
-   IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId,
-   ExecutionState producerState) throws IOException, 
InterruptedException {
+   ExecutionState producerState,
+   CompletableFuture producerReadyFuture) {
 
 Review comment:
   Instead of passing in this future, I would return a boolean indicating 
whether the producer is ready or not


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286560412
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck(
} else {
failExternally(throwable);
}
-   } catch (IOException | InterruptedException e) {
-   failExternally(e);
+   } catch (Throwable t) {
+   
failExternally(stripCompletionException(t));
}
},
executor);
+
+   return producerReadyFuture;
 
 Review comment:
   What about the following alternative:
   ```
   final CompletableFuture producerReadyFuture = new 
CompletableFuture<>();
   
   FutureUtils.assertNoException(
futurePartitionState.whenCompleteAsync(
(ExecutionState executionState, Throwable throwable) -> {
if (executionState != null || throwable instanceof 
TimeoutException) {
final boolean producerReady = 
onPartitionStateUpdate(
resultPartitionId,
executionState != null ? executionState 
: ExecutionState.RUNNING);
   
producerReadyFuture.complete(producerReady);
} else {
if (throwable instanceof 
PartitionProducerDisposedException) {
String msg = String.format("Producer %s 
of partition %s disposed. Cancelling execution.",

resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
LOG.info(msg, throwable);
cancelExecution();
} else {
failExternally(throwable);
}
   
producerReadyFuture.complete(false);
}
},
executor));
   
   return producerReadyFuture;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286551026
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck(
} else {
failExternally(throwable);
}
-   } catch (IOException | InterruptedException e) {
-   failExternally(e);
+   } catch (Throwable t) {
 
 Review comment:
   We should not catch `Throwable` because it could be a legitimate Flink 
problem which should make us to fail the process.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286565922
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1221,53 +1214,46 @@ public void run() {
 */
@VisibleForTesting
void onPartitionStateUpdate(
 
 Review comment:
   Maybe rename into `isProducerReadyAndHandlePartitionStateUpdate`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286546519
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
-
-   if (singleInputGate != null) {
-   // Run asynchronously because it might 
be blocking
-   getRpcService().execute(
-   () -> {
-   try {
-   
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-   } catch (IOException | 
InterruptedException e) {
-   
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-   try {
-   
task.failExternally(e);
-   } catch 
(RuntimeException re) {
-   // 
TODO: Check whether we need this or make exception in failExtenally checked
-   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-   }
-   }
-   });
-   } else {
-   return 
FutureUtils.completedExceptionally(
-   new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
-   " for task " + 
executionAttemptID + " was found."));
-   }
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (Throwable t) {
+   log.error(
+   "Could not 
update input data location for task {}. Trying to fail task.",
+   
task.getTaskInfo().getTaskName(),
+   t);
+   
FutureUtils.assertNoException(CompletableFuture.runAsync(
+   () -> 
task.failExternally(t),
+   
getRpcService().getExecutor()));
+   }
+   });
 
 Review comment:
   I would suggest to exchange this block with:
   ```
   FutureUtils.assertNoException(
CompletableFuture.runAsync(
() -> {
try {

networkEnvironment.updatePartitionInfo(partitionInfo);
} catch (IOException | InterruptedException e) {
log.error("Could not update input data location 
for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
task.failExternally(e);
}
},
getRpcService().getExecutor()));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286568353
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -97,13 +104,15 @@ private NetworkEnvironment(
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
+   Map 
inputGatesById,
 
 Review comment:
   Passing the map for storing the input gates into `NetworkEnvironment` 
exposes in my opinion a bit too many implementation details of this structure. 
I basically says that the object needs to store the `SingleInputGates` in this 
structure. Otherwise the test will fail. I think it would be better to have a 
`getInputGate(IntermediateDataSetID)` and let the way the gates are stored 
internally be an implementation detail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-21 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285924862
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +612,23 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (IOException | 
PartitionException | InterruptedException e) {
 
 Review comment:
   I think before it would send `PartitionException` back to the `JobMaster` 
and which would then fail the `Execution`. This should have the same effect. 
Verifying whether this is guarded by a test makes sense, though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285663137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +612,23 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (IOException | 
PartitionException | InterruptedException e) {
+   log.error("Could not 
update input data location for task {}. Trying to fail task.", 
task.getTaskInfo().getTaskName(), e);
 
-   if (singleInputGate != null) {
-   // Run asynchronously because it might 
be blocking
-   getRpcService().execute(
-   () -> {
try {
-   
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-   } catch (IOException | 
InterruptedException e) {
-   
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-   try {
-   
task.failExternally(e);
-   } catch 
(RuntimeException re) {
-   // 
TODO: Check whether we need this or make exception in failExtenally checked
-   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-   }
+   
task.failExternally(e);
+   } catch 
(RuntimeException re) {
 
 Review comment:
   I would suggest to remove this catch block and instead do the following:
   ```
   FutureUtils.assertNoException(
 CompletableFuture.runAsync(() -> ..., getRpcService()));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285662172
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -331,6 +346,24 @@ private void registerInputMetrics(MetricGroup inputGroup, 
MetricGroup buffersGro
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
}
 
+   /**
+* Update consuming gate with newly available partition.
+*
+* @param partitionInfo telling where the partition can be retrieved 
from
+* @throws IOException IO problem by the update
+* @throws InterruptedException potentially blocking operation was 
interrupted
+* @throws PartitionException the input gate with the id from the 
partitionInfo is not found
+*/
+   public void updatePartitionInfo(PartitionInfo partitionInfo)
+   throws IOException, InterruptedException, PartitionException {
+   IntermediateDataSetID intermediateResultPartitionID = 
partitionInfo.getIntermediateDataSetID();
+   SingleInputGate inputGate = 
inputGatesById.get(intermediateResultPartitionID);
+   if (inputGate == null) {
+   throw new PartitionException("No reader with ID " + 
intermediateResultPartitionID + " was found.");
 
 Review comment:
   Should this maybe be an `IllegalStateException` because this should actually 
not happen?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285638898
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -110,6 +117,8 @@ public NetworkEnvironment(
 
this.resultPartitionManager = new ResultPartitionManager();
 
+   this.inputGatesById = new ConcurrentHashMap<>();
 
 Review comment:
   Which threads do access this structure concurrently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285640957
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -634,7 +633,8 @@ void notifyChannelNonEmpty(InputChannel channel) {
}
 
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-   taskActions.triggerPartitionProducerStateCheck(jobId, 
consumedResultId, partitionId);
+   
taskActions.triggerPartitionProducerStateCheck(consumedResultId, partitionId,
+   () -> 
retriggerPartitionRequest(partitionId.getPartitionId()));
 
 Review comment:
   Line breaking


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285644704
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void checkInputGateRemoveInNetworkEnvironment() throws 
IOException {
+   NetworkEnvironment network = createNetworkEnvironment();
+   InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+   // Local
+   new InputChannelDeploymentDescriptor(
+   new ResultPartitionID(),
+   ResultPartitionLocation.createLocal())};
+   IntermediateDataSetID id = new IntermediateDataSetID();
+   InputGateDeploymentDescriptor igdd = new 
InputGateDeploymentDescriptor(id,
+   ResultPartitionType.PIPELINED, 0, channelDescs);
+   SingleInputGate[] inputGates = network.createInputGates("",
+   new NoOpTaskActions(), Collections.singletonList(igdd),
+   new UnregisteredMetricsGroup(), new 
UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(),
+   new SimpleCounter());
+   assertEquals(1, inputGates.length);
+   assertEquals(1, network.getInputGatesById().size());
+   assertTrue(network.getInputGatesById().containsKey(id));
+   inputGates[0].close();
+   assertEquals(0, network.getInputGatesById().size());
+   assertFalse(network.getInputGatesById().containsKey(id));
 
 Review comment:
   The two last assertions seem to be redundant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285644373
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void checkInputGateRemoveInNetworkEnvironment() throws 
IOException {
+   NetworkEnvironment network = createNetworkEnvironment();
+   InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+   // Local
+   new InputChannelDeploymentDescriptor(
+   new ResultPartitionID(),
+   ResultPartitionLocation.createLocal())};
+   IntermediateDataSetID id = new IntermediateDataSetID();
+   InputGateDeploymentDescriptor igdd = new 
InputGateDeploymentDescriptor(id,
+   ResultPartitionType.PIPELINED, 0, channelDescs);
+   SingleInputGate[] inputGates = network.createInputGates("",
+   new NoOpTaskActions(), Collections.singletonList(igdd),
+   new UnregisteredMetricsGroup(), new 
UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(),
+   new SimpleCounter());
 
 Review comment:
   line breaks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285644496
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void checkInputGateRemoveInNetworkEnvironment() throws 
IOException {
+   NetworkEnvironment network = createNetworkEnvironment();
+   InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+   // Local
+   new InputChannelDeploymentDescriptor(
+   new ResultPartitionID(),
+   ResultPartitionLocation.createLocal())};
+   IntermediateDataSetID id = new IntermediateDataSetID();
+   InputGateDeploymentDescriptor igdd = new 
InputGateDeploymentDescriptor(id,
+   ResultPartitionType.PIPELINED, 0, channelDescs);
+   SingleInputGate[] inputGates = network.createInputGates("",
+   new NoOpTaskActions(), Collections.singletonList(igdd),
+   new UnregisteredMetricsGroup(), new 
UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(),
+   new SimpleCounter());
+   assertEquals(1, inputGates.length);
+   assertEquals(1, network.getInputGatesById().size());
+   assertTrue(network.getInputGatesById().containsKey(id));
+   inputGates[0].close();
+   assertEquals(0, network.getInputGatesById().size());
+   assertFalse(network.getInputGatesById().containsKey(id));
 
 Review comment:
   I'd suggest to use Hamcrest for new tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285660972
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -185,22 +181,24 @@
 
private final Counter numBytesIn;
 
+   private final Runnable closeListener;
+
public SingleInputGate(
String owningTaskName,
-   JobID jobId,
IntermediateDataSetID consumedResultId,
final ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
Counter numBytesIn,
-   boolean isCreditBased) {
+   boolean isCreditBased,
+   Runnable closeListener) {
 
 Review comment:
   For the close listener we could maybe apply a similar trick as with the 
partition request retriggering. We could for example add a termination future 
to the `SingleInputGate` which is completed once the gate gets closed. On this 
future we can register the removal from the `inputGatesById` map. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285639189
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -150,6 +159,11 @@ public NetworkEnvironmentConfiguration getConfiguration() 
{
return config;
}
 
+   @VisibleForTesting
+   public Map getInputGatesById() {
 
 Review comment:
   Do we need to expose the map or would it be enough to have 
`getInputGate(IntermediateDataSetID)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285640257
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -331,6 +346,24 @@ private void registerInputMetrics(MetricGroup inputGroup, 
MetricGroup buffersGro
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
}
 
+   /**
+* Update consuming gate with newly available partition.
+*
+* @param partitionInfo telling where the partition can be retrieved 
from
+* @throws IOException IO problem by the update
+* @throws InterruptedException potentially blocking operation was 
interrupted
+* @throws PartitionException the input gate with the id from the 
partitionInfo is not found
+*/
+   public void updatePartitionInfo(PartitionInfo partitionInfo)
+   throws IOException, InterruptedException, PartitionException {
 
 Review comment:
   Personally I'd prefer to put `throws` on the same line as `)`. The problem 
is that it now looks as if the `throws` line belongs to the body. Just personal 
taste, though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-20 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r285658953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
 ##
 @@ -30,15 +30,15 @@
/**
 * Check the execution state of the execution producing a result 
partition.
 *
-* @param jobId ID of the job the partition belongs to.
 * @param intermediateDataSetId ID of the parent intermediate data set.
 * @param resultPartitionId ID of the result partition to check. This
 * identifies the producing execution and partition.
+* @param producerReadyCallback callback of producer is in one of 
consumable states.
 */
void triggerPartitionProducerStateCheck(
-   JobID jobId,
IntermediateDataSetID intermediateDataSetId,
-   ResultPartitionID resultPartitionId);
+   ResultPartitionID resultPartitionId,
+   ThrowingRunnable producerReadyCallback);
 
 Review comment:
   Instead of adding this callback, we could also let this method return a 
`CompletableFuture` which if completed indicates to retrigger the 
partition request. That way we would not have to pass in the callback which is 
forwarded to some other place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services