[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288287665 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.types.Either; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * Handles the result of {@link PartitionStateChecker}. + * + * The method {@code isProducerConsumerReadyOrAbortConsumption} determines whether the partition producer is + * in a producing state and consumer is ready for consumption. + */ +public class RemoteChannelStateChecker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelStateChecker.class); + + private final ResultPartitionID resultPartitionId; + + private final String taskNameWithSubtask; + + public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String taskNameWithSubtask) { + this.resultPartitionId = resultPartitionId; + this.taskNameWithSubtask = taskNameWithSubtask; + } + + public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult checkResult) { + Either result = checkResult.getProducerExecutionState(); + if (result.isLeft() || result.right() instanceof TimeoutException) { + boolean isProducerConsumerReady = isProducerConsumerReady(checkResult); + if (isProducerConsumerReady) { + return true; + } else { + abortConsumptionOrIgnoreCheckResult(checkResult); + } + } else { + handleFailedCheckResult(checkResult); + } + return false; + } + + private boolean isProducerConsumerReady(CheckResult checkResult) { + ExecutionState consumerState = checkResult.getConsumerExecutionState(); + Either result = checkResult.getProducerExecutionState(); + ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING; Review comment: This logic is duplicated here and in `abortConsumptionOrIgnoreCheckResult`. Would be good to deduplicate it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288290783 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.types.Either; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * Handles the result of {@link PartitionStateChecker}. + * + * The method {@code isProducerConsumerReadyOrAbortConsumption} determines whether the partition producer is + * in a producing state and consumer is ready for consumption. + */ +public class RemoteChannelStateChecker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelStateChecker.class); + + private final ResultPartitionID resultPartitionId; + + private final String taskNameWithSubtask; + + public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String taskNameWithSubtask) { + this.resultPartitionId = resultPartitionId; + this.taskNameWithSubtask = taskNameWithSubtask; + } + + public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult checkResult) { Review comment: Not super important and out of scope for this PR, but this method either triggers some action (fail or cancel) or returns a decision (trigger new partition check or not). I think it would be more symmetric if this class would not trigger any action but only return a decision what to do: ``` enum Action { FAIL(Throwable cause), CANCEL(String msg), TRIGGER_PARTITION_CHECK, NOOP } ``` Then the caller would be responsible for making the action. That way this class would only need access to `checkResult.getProducerExecutionState()` and not `checkResult` itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288283528 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateID.java ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.io.Serializable; + +/** + * Runtime identifier of a consumed {@link org.apache.flink.runtime.executiongraph.IntermediateResult}. + * + * In runtime the {@link org.apache.flink.runtime.jobgraph.IntermediateDataSetID} is not enough to uniquely Review comment: nit: At runtime This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288288062 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.types.Either; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * Handles the result of {@link PartitionStateChecker}. + * + * The method {@code isProducerConsumerReadyOrAbortConsumption} determines whether the partition producer is + * in a producing state and consumer is ready for consumption. + */ +public class RemoteChannelStateChecker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelStateChecker.class); + + private final ResultPartitionID resultPartitionId; + + private final String taskNameWithSubtask; + + public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String taskNameWithSubtask) { + this.resultPartitionId = resultPartitionId; + this.taskNameWithSubtask = taskNameWithSubtask; + } + + public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult checkResult) { + Either result = checkResult.getProducerExecutionState(); + if (result.isLeft() || result.right() instanceof TimeoutException) { + boolean isProducerConsumerReady = isProducerConsumerReady(checkResult); + if (isProducerConsumerReady) { + return true; + } else { + abortConsumptionOrIgnoreCheckResult(checkResult); + } + } else { + handleFailedCheckResult(checkResult); + } + return false; + } + + private boolean isProducerConsumerReady(CheckResult checkResult) { + ExecutionState consumerState = checkResult.getConsumerExecutionState(); + Either result = checkResult.getProducerExecutionState(); + ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING; + return consumerState == ExecutionState.RUNNING && + (producerState == ExecutionState.SCHEDULED || + producerState == ExecutionState.DEPLOYING || + producerState == ExecutionState.RUNNING || + producerState == ExecutionState.FINISHED); + } + + private void abortConsumptionOrIgnoreCheckResult(CheckResult checkResult) { + ExecutionState consumerState = checkResult.getConsumerExecutionState(); + Either result = checkResult.getProducerExecutionState(); + ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING; + if (consumerState == ExecutionState.RUNNING) { + if (producerState == ExecutionState.CANCELING || + producerState == ExecutionState.CANCELED || + producerState == ExecutionState.FAILED) { + + // The producing execution has been canceled or failed. We + // don't need to re-trigger the request since it cannot + // succeed. + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelling task {} after the producer
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288283226 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionStateChecker.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.types.Either; + +import java.util.concurrent.CompletableFuture; + +/** + * Queries execution states of partition producer and consumer, accepts actions of state check result. + */ +public interface PartitionStateChecker { + /** +* Triggers check of the producer and consumer execution states. +* +* @param intermediateDataSetId ID of the parent intermediate data set. +* @param resultPartitionId ID of the result partition to check. This +* identifies the producing execution and partition. +* @return a future of check result. +*/ + CompletableFuture triggerPartitionProducerCheck( + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId); + + /** +* Result of partition state check, accepts check callbacks. +*/ + interface CheckResult { + ExecutionState getConsumerExecutionState(); Review comment: Do we really need the consumer's execution state here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r288287374 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.PartitionStateChecker.CheckResult; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.types.Either; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * Handles the result of {@link PartitionStateChecker}. + * + * The method {@code isProducerConsumerReadyOrAbortConsumption} determines whether the partition producer is + * in a producing state and consumer is ready for consumption. + */ +public class RemoteChannelStateChecker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelStateChecker.class); + + private final ResultPartitionID resultPartitionId; + + private final String taskNameWithSubtask; + + public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String taskNameWithSubtask) { + this.resultPartitionId = resultPartitionId; + this.taskNameWithSubtask = taskNameWithSubtask; + } + + public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult checkResult) { + Either result = checkResult.getProducerExecutionState(); + if (result.isLeft() || result.right() instanceof TimeoutException) { + boolean isProducerConsumerReady = isProducerConsumerReady(checkResult); + if (isProducerConsumerReady) { + return true; + } else { + abortConsumptionOrIgnoreCheckResult(checkResult); + } + } else { + handleFailedCheckResult(checkResult); + } + return false; + } + + private boolean isProducerConsumerReady(CheckResult checkResult) { + ExecutionState consumerState = checkResult.getConsumerExecutionState(); + Either result = checkResult.getProducerExecutionState(); + ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING; + return consumerState == ExecutionState.RUNNING && Review comment: Can we check the `consumerState` outside of the `RemoteChannelStateChecker`? If the consumer (==this) is not running, then we should simply ignore the update message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286541806 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); - - if (singleInputGate != null) { - // Run asynchronously because it might be blocking - getRpcService().execute( - () -> { - try { - singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); - } catch (IOException | InterruptedException e) { - log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - - try { - task.failExternally(e); - } catch (RuntimeException re) { - // TODO: Check whether we need this or make exception in failExtenally checked - log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); - } - } - }); - } else { - return FutureUtils.completedExceptionally( - new PartitionException("No reader with ID " + intermediateResultPartitionID + - " for task " + executionAttemptID + " was found.")); - } + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (Throwable t) { Review comment: I think we should not catch `Throwable` here. If an unchecked exception occurs, it should simply bubble up and cause the component to fail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286540299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -441,6 +444,7 @@ public void close() throws IOException { finally { isReleased = true; Review comment: I think we could remove this field because we have now the `closeFuture` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286563093 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1221,53 +1214,46 @@ public void run() { */ @VisibleForTesting void onPartitionStateUpdate( - IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId, - ExecutionState producerState) throws IOException, InterruptedException { + ExecutionState producerState, + CompletableFuture producerReadyFuture) { Review comment: Instead of passing in this future, I would return a boolean indicating whether the producer is ready or not This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286560412 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck( } else { failExternally(throwable); } - } catch (IOException | InterruptedException e) { - failExternally(e); + } catch (Throwable t) { + failExternally(stripCompletionException(t)); } }, executor); + + return producerReadyFuture; Review comment: What about the following alternative: ``` final CompletableFuture producerReadyFuture = new CompletableFuture<>(); FutureUtils.assertNoException( futurePartitionState.whenCompleteAsync( (ExecutionState executionState, Throwable throwable) -> { if (executionState != null || throwable instanceof TimeoutException) { final boolean producerReady = onPartitionStateUpdate( resultPartitionId, executionState != null ? executionState : ExecutionState.RUNNING); producerReadyFuture.complete(producerReady); } else { if (throwable instanceof PartitionProducerDisposedException) { String msg = String.format("Producer %s of partition %s disposed. Cancelling execution.", resultPartitionId.getProducerId(), resultPartitionId.getPartitionId()); LOG.info(msg, throwable); cancelExecution(); } else { failExternally(throwable); } producerReadyFuture.complete(false); } }, executor)); return producerReadyFuture; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286551026 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck( } else { failExternally(throwable); } - } catch (IOException | InterruptedException e) { - failExternally(e); + } catch (Throwable t) { Review comment: We should not catch `Throwable` because it could be a legitimate Flink problem which should make us to fail the process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286565922 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1221,53 +1214,46 @@ public void run() { */ @VisibleForTesting void onPartitionStateUpdate( Review comment: Maybe rename into `isProducerReadyAndHandlePartitionStateUpdate` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286546519 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); - - if (singleInputGate != null) { - // Run asynchronously because it might be blocking - getRpcService().execute( - () -> { - try { - singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); - } catch (IOException | InterruptedException e) { - log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - - try { - task.failExternally(e); - } catch (RuntimeException re) { - // TODO: Check whether we need this or make exception in failExtenally checked - log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); - } - } - }); - } else { - return FutureUtils.completedExceptionally( - new PartitionException("No reader with ID " + intermediateResultPartitionID + - " for task " + executionAttemptID + " was found.")); - } + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (Throwable t) { + log.error( + "Could not update input data location for task {}. Trying to fail task.", + task.getTaskInfo().getTaskName(), + t); + FutureUtils.assertNoException(CompletableFuture.runAsync( + () -> task.failExternally(t), + getRpcService().getExecutor())); + } + }); Review comment: I would suggest to exchange this block with: ``` FutureUtils.assertNoException( CompletableFuture.runAsync( () -> { try { networkEnvironment.updatePartitionInfo(partitionInfo); } catch (IOException | InterruptedException e) { log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); task.failExternally(e); } }, getRpcService().getExecutor())); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286568353 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -97,13 +104,15 @@ private NetworkEnvironment( NetworkBufferPool networkBufferPool, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, + Map inputGatesById, Review comment: Passing the map for storing the input gates into `NetworkEnvironment` exposes in my opinion a bit too many implementation details of this structure. I basically says that the object needs to store the `SingleInputGates` in this structure. Otherwise the test will fail. I think it would be better to have a `getInputGate(IntermediateDataSetID)` and let the way the gates are stored internally be an implementation detail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285924862 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +612,23 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (IOException | PartitionException | InterruptedException e) { Review comment: I think before it would send `PartitionException` back to the `JobMaster` and which would then fail the `Execution`. This should have the same effect. Verifying whether this is guarded by a test makes sense, though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285663137 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +612,23 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (IOException | PartitionException | InterruptedException e) { + log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - if (singleInputGate != null) { - // Run asynchronously because it might be blocking - getRpcService().execute( - () -> { try { - singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); - } catch (IOException | InterruptedException e) { - log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - - try { - task.failExternally(e); - } catch (RuntimeException re) { - // TODO: Check whether we need this or make exception in failExtenally checked - log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); - } + task.failExternally(e); + } catch (RuntimeException re) { Review comment: I would suggest to remove this catch block and instead do the following: ``` FutureUtils.assertNoException( CompletableFuture.runAsync(() -> ..., getRpcService())); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285662172 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -331,6 +346,24 @@ private void registerInputMetrics(MetricGroup inputGroup, MetricGroup buffersGro buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); } + /** +* Update consuming gate with newly available partition. +* +* @param partitionInfo telling where the partition can be retrieved from +* @throws IOException IO problem by the update +* @throws InterruptedException potentially blocking operation was interrupted +* @throws PartitionException the input gate with the id from the partitionInfo is not found +*/ + public void updatePartitionInfo(PartitionInfo partitionInfo) + throws IOException, InterruptedException, PartitionException { + IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); + SingleInputGate inputGate = inputGatesById.get(intermediateResultPartitionID); + if (inputGate == null) { + throw new PartitionException("No reader with ID " + intermediateResultPartitionID + " was found."); Review comment: Should this maybe be an `IllegalStateException` because this should actually not happen? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285638898 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -110,6 +117,8 @@ public NetworkEnvironment( this.resultPartitionManager = new ResultPartitionManager(); + this.inputGatesById = new ConcurrentHashMap<>(); Review comment: Which threads do access this structure concurrently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285640957 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -634,7 +633,8 @@ void notifyChannelNonEmpty(InputChannel channel) { } void triggerPartitionStateCheck(ResultPartitionID partitionId) { - taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId); + taskActions.triggerPartitionProducerStateCheck(consumedResultId, partitionId, + () -> retriggerPartitionRequest(partitionId.getPartitionId())); Review comment: Line breaking This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285644704 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws Exception { } } + @Test + public void checkInputGateRemoveInNetworkEnvironment() throws IOException { + NetworkEnvironment network = createNetworkEnvironment(); + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + new ResultPartitionID(), + ResultPartitionLocation.createLocal())}; + IntermediateDataSetID id = new IntermediateDataSetID(); + InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(id, + ResultPartitionType.PIPELINED, 0, channelDescs); + SingleInputGate[] inputGates = network.createInputGates("", + new NoOpTaskActions(), Collections.singletonList(igdd), + new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), + new SimpleCounter()); + assertEquals(1, inputGates.length); + assertEquals(1, network.getInputGatesById().size()); + assertTrue(network.getInputGatesById().containsKey(id)); + inputGates[0].close(); + assertEquals(0, network.getInputGatesById().size()); + assertFalse(network.getInputGatesById().containsKey(id)); Review comment: The two last assertions seem to be redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285644373 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws Exception { } } + @Test + public void checkInputGateRemoveInNetworkEnvironment() throws IOException { + NetworkEnvironment network = createNetworkEnvironment(); + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + new ResultPartitionID(), + ResultPartitionLocation.createLocal())}; + IntermediateDataSetID id = new IntermediateDataSetID(); + InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(id, + ResultPartitionType.PIPELINED, 0, channelDescs); + SingleInputGate[] inputGates = network.createInputGates("", + new NoOpTaskActions(), Collections.singletonList(igdd), + new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), + new SimpleCounter()); Review comment: line breaks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285644496 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws Exception { } } + @Test + public void checkInputGateRemoveInNetworkEnvironment() throws IOException { + NetworkEnvironment network = createNetworkEnvironment(); + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + new ResultPartitionID(), + ResultPartitionLocation.createLocal())}; + IntermediateDataSetID id = new IntermediateDataSetID(); + InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(id, + ResultPartitionType.PIPELINED, 0, channelDescs); + SingleInputGate[] inputGates = network.createInputGates("", + new NoOpTaskActions(), Collections.singletonList(igdd), + new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), + new SimpleCounter()); + assertEquals(1, inputGates.length); + assertEquals(1, network.getInputGatesById().size()); + assertTrue(network.getInputGatesById().containsKey(id)); + inputGates[0].close(); + assertEquals(0, network.getInputGatesById().size()); + assertFalse(network.getInputGatesById().containsKey(id)); Review comment: I'd suggest to use Hamcrest for new tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285660972 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -185,22 +181,24 @@ private final Counter numBytesIn; + private final Runnable closeListener; + public SingleInputGate( String owningTaskName, - JobID jobId, IntermediateDataSetID consumedResultId, final ResultPartitionType consumedPartitionType, int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, Counter numBytesIn, - boolean isCreditBased) { + boolean isCreditBased, + Runnable closeListener) { Review comment: For the close listener we could maybe apply a similar trick as with the partition request retriggering. We could for example add a termination future to the `SingleInputGate` which is completed once the gate gets closed. On this future we can register the removal from the `inputGatesById` map. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285639189 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -150,6 +159,11 @@ public NetworkEnvironmentConfiguration getConfiguration() { return config; } + @VisibleForTesting + public Map getInputGatesById() { Review comment: Do we need to expose the map or would it be enough to have `getInputGate(IntermediateDataSetID)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285640257 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -331,6 +346,24 @@ private void registerInputMetrics(MetricGroup inputGroup, MetricGroup buffersGro buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); } + /** +* Update consuming gate with newly available partition. +* +* @param partitionInfo telling where the partition can be retrieved from +* @throws IOException IO problem by the update +* @throws InterruptedException potentially blocking operation was interrupted +* @throws PartitionException the input gate with the id from the partitionInfo is not found +*/ + public void updatePartitionInfo(PartitionInfo partitionInfo) + throws IOException, InterruptedException, PartitionException { Review comment: Personally I'd prefer to put `throws` on the same line as `)`. The problem is that it now looks as if the `throws` line belongs to the body. Just personal taste, though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r285658953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java ## @@ -30,15 +30,15 @@ /** * Check the execution state of the execution producing a result partition. * -* @param jobId ID of the job the partition belongs to. * @param intermediateDataSetId ID of the parent intermediate data set. * @param resultPartitionId ID of the result partition to check. This * identifies the producing execution and partition. +* @param producerReadyCallback callback of producer is in one of consumable states. */ void triggerPartitionProducerStateCheck( - JobID jobId, IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId); + ResultPartitionID resultPartitionId, + ThrowingRunnable producerReadyCallback); Review comment: Instead of adding this callback, we could also let this method return a `CompletableFuture` which if completed indicates to retrigger the partition request. That way we would not have to pass in the callback which is forwarded to some other place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services