[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327972417 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +/** + * Restart backoff time strategy that never restarts. + */ +public enum NeverRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { Review comment: PR#8912 also introduces a NoRestartBackoffTimeStrategy. I think we can drop this class once PR#8912 is merged. And the restart strategy loading should be updated then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327982708 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +/** + * This class is a tuple holding the information necessary to deploy an {@link ExecutionVertex}. + * + * The tuple consists of: + * + * {@link ExecutionVertexVersion} + * {@link ExecutionVertexDeploymentOption} + * {@link SlotExecutionVertexAssignment} + * + */ +class DeploymentHandle { + + private final ExecutionVertexVersion requiredVertexVersion; + + private final ExecutionVertexDeploymentOption executionVertexDeploymentOption; + + private final SlotExecutionVertexAssignment slotExecutionVertexAssignment; + + public DeploymentHandle( + final ExecutionVertexVersion requiredVertexVersion, + final ExecutionVertexDeploymentOption executionVertexDeploymentOption, + final SlotExecutionVertexAssignment slotExecutionVertexAssignment) { + + this.requiredVertexVersion = Preconditions.checkNotNull(requiredVertexVersion); + this.executionVertexDeploymentOption = Preconditions.checkNotNull(executionVertexDeploymentOption); + this.slotExecutionVertexAssignment = Preconditions.checkNotNull(slotExecutionVertexAssignment); + } + + public ExecutionVertexID getExecutionVertexId() { + return requiredVertexVersion.getExecutionVertexId(); + } + + public ExecutionVertexVersion getRequiredVertexVersion() { + return requiredVertexVersion; + } + + public DeploymentOption getDeploymentOption() { + return executionVertexDeploymentOption.getDeploymentOption(); + } + + public SlotExecutionVertexAssignment getSlotExecutionVertexAssignment() { + return slotExecutionVertexAssignment; + } + + public LogicalSlot getLogicalSlot() { + final LogicalSlot logicalSlot = slotExecutionVertexAssignment.getLogicalSlotFuture().getNow(null); Review comment: This method is currently called in DefaultScheduler#stopDeployment. There is possibility that the future is completed exceptionally, e.g. when slot allocation timed out. And it may cause fatal error then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327972417 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +/** + * Restart backoff time strategy that never restarts. + */ +public enum NeverRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { Review comment: PR#8912 also introduces a NoRestartBackoffTimeStrategy. We may drop it once PR#8912 is merged. And the restart strategy loading should be updated then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327979650 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,292 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); Review comment: It would be helpful to log what scheduling strategy is used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327709276 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID fai * @param t The exception that caused the failure. */ public void failGlobal(Throwable t) { + if (!isLegacyScheduling()) { + ExceptionUtils.rethrow(t); Review comment: Ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327494085 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable);
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327446705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; + this.slotRequestTimeout = slotRequestTimeout; + this.slotProvider = slotProvider; + this.delayExecutor = delayExecutor; + this.userCodeLoader = userCodeLoader; + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = executionVertexVersioner; + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); + } + + // + // SchedulerNG + // + + @Override + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); Review comment: I think it's fine to not change it at the moment, given that 1. At the moment, the schedulingStrategy.onExecutionStateChange() is only notified for FAILED state internally and RUNNING/FINISHED/CANCELED/FAILED state from TM side 2. RUNNING/FAILED/CANCELED/FAILED state do not tigger scheduling in the eager/lazy scheduling strategies 3. FINISHED state is currently used for lazy scheduling but it may be changed as discussed [in this comment](https://github.com/apache/flink/pull/9663#discussion_r326540913) We can reconsider this when later we are to redefine the scheduling strategy interfaces or refine the invocations as discussed in [the other comment](https://github.com/apache/flink/pull/9663/files#r326897974). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327421441 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID fai * @param t The exception that caused the failure. */ public void failGlobal(Throwable t) { + if (!isLegacyScheduling()) { + ExceptionUtils.rethrow(t); Review comment: > For real consistency issues caused by bugs, illegal state transitions, etc. I think it is reasonable to terminate the JVM. I think it's reasonable but I'm not sure whether this change can cause much more JM failovers than it used to be. This could be an even severe problem for session cluster which holds multiple jobs in a Dispatcher. > One could propagate the exception to the JobMaster and in the RPC framework check for unwanted uncaught exceptions in specially annotated RPC methods. If we'd like to terminate the JVM, why not do it right in the place the fatal error happens? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327418704 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable);
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327410288 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327150696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +/** + * Operations on the ExecutionVertex. + */ +interface ExecutionVertexOperations { + + void deploy(ExecutionVertex executionVertex) throws JobException; Review comment: Ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327147478 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1485,6 +1525,20 @@ else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failu } } + public void failJob(Throwable cause) { + if (state == JobStatus.FAILING || state.isGloballyTerminalState()) { + return; + } + + transitionState(JobStatus.FAILING); + initFailureCause(cause); + + cancelVerticesAsync().whenComplete((aVoid, throwable) -> { + transitionState(JobStatus.FAILED); Review comment: Oh I see. I think we can do that for transition to `FAILING` as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327089390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable);
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327089390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable);
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327083068 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -23,40 +23,93 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy; import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** - * Stub implementation of the future default scheduler. + * The future default scheduler. */ -public class DefaultScheduler extends LegacyScheduler { +public class DefaultScheduler extends SchedulerBase implements SchedulerOperations { + + private final Logger log; + + private final ClassLoader userCodeLoader; + + private final ExecutionSlotAllocator executionSlotAllocator; + + private final ExecutionFailureHandler executionFailureHandler; + + private final ScheduledExecutor delayExecutor; + + private final SchedulingStrategy schedulingStrategy; + + private final ExecutionVertexVersioner executionVertexVersioner; + + private final ExecutionVertexOperations executionVertexOperations; public DefaultScheduler( - final Logger log, Review comment: Fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327081463 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -44,13 +44,18 @@ /** Config name for the {@link RestartPipelinedRegionStrategy}. */ public static final String LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy"; + /** Config name for the {@link NoOpFailoverStrategy} */ + public static final String NO_OP_FAILOVER_STRATEGY = "noop"; + // /** * Loads a FailoverStrategy Factory from the given configuration. */ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) { - final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY); + final String strategyParam = config.getString(JobManagerOptions.SCHEDULER).equals("ng") ? + NO_OP_FAILOVER_STRATEGY : Review comment: I see. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327079025 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1485,6 +1525,20 @@ else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failu } } + public void failJob(Throwable cause) { + if (state == JobStatus.FAILING || state.isGloballyTerminalState()) { + return; + } + + transitionState(JobStatus.FAILING); + initFailureCause(cause); + + cancelVerticesAsync().whenComplete((aVoid, throwable) -> { + transitionState(JobStatus.FAILED); Review comment: The transition log will contain the cause if there is any. Without the cause seems we do not log the information that why the job is FAILING/FAILED? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327076866 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327076866 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327067464 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable);
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326982305 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; + this.slotRequestTimeout = slotRequestTimeout; + this.slotProvider = slotProvider; + this.delayExecutor = delayExecutor; + this.userCodeLoader = userCodeLoader; + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = executionVertexVersioner; + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); + } + + // + // SchedulerNG + // + + @Override + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); Review comment: Exactly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326977860 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { + + public static ExecutionVertexSchedulingRequirements from(final ExecutionVertex executionVertex) { + + final ExecutionVertexID executionVertexId = new ExecutionVertexID( + executionVertex.getJobVertex().getJobVertexId(), + executionVertex.getParallelSubtaskIndex()); + + final AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation(); + final SlotSharingGroup slotSharingGroup = executionVertex.getJobVertex().getSlotSharingGroup(); + + return new ExecutionVertexSchedulingRequirements.Builder() + .withExecutionVertexId(executionVertexId) + .withPreviousAllocationId(latestPriorAllocation) + .withSlotSharingGroupId(slotSharingGroup == null ? null : slotSharingGroup.getSlotSharingGroupId()) + .withCoLocationConstraint(executionVertex.getLocationConstraint()) + .withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build(); Review comment: I see. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326899233 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; + this.slotRequestTimeout = slotRequestTimeout; + this.slotProvider = slotProvider; + this.delayExecutor = delayExecutor; + this.userCodeLoader = userCodeLoader; + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = executionVertexVersioner; + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); + } + + // + // SchedulerNG + // + + @Override + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); Review comment: I think the potential that `schedulingStrategy.onExecutionStateChange` changes task state directly in this thread is not good: * It can cause call stack chain that is hard to imagine, which makes it hard to maintain * a very long call stack chain may result in stack overflow * when we invoke `maybeHandleTaskFailure` right after invoking `schedulingStrategy.onExecutionStateChange`, the task state may even have changed in the call stack chain so that we are doing failover handling in an unexpected state How about to define that `SchedulerOperations#allocateSlotsAndDeploy` does not take effect in the direct invoking? And then change the actions in `allocateSlotsAndDeploy` to be executed in the main thread. In this way we can have the assumption that no task state change happens when invoking `schedulingStrategy.onExecutionStateChange` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326901044 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { + + public static ExecutionVertexSchedulingRequirements from(final ExecutionVertex executionVertex) { + + final ExecutionVertexID executionVertexId = new ExecutionVertexID( + executionVertex.getJobVertex().getJobVertexId(), + executionVertex.getParallelSubtaskIndex()); + + final AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation(); + final SlotSharingGroup slotSharingGroup = executionVertex.getJobVertex().getSlotSharingGroup(); + + return new ExecutionVertexSchedulingRequirements.Builder() + .withExecutionVertexId(executionVertexId) + .withPreviousAllocationId(latestPriorAllocation) + .withSlotSharingGroupId(slotSharingGroup == null ? null : slotSharingGroup.getSlotSharingGroupId()) + .withCoLocationConstraint(executionVertex.getLocationConstraint()) + .withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build(); Review comment: In the legacy scheduler, the preference locations from `getPreferredLocationsBasedOnInputs` is used if `getPreferredLocationsBasedOnState` returns null. Are we deprecating `location preferences based on inputs` intentionally? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326897974 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy); + this.slotRequestTimeout = checkNotNull(slotRequestTimeout); + this.slotProvider = checkNotNull(slotProvider); + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); } + // + // SchedulerNG + // + @Override - public void startScheduling() { - throw new UnsupportedOperationException(); + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); Review comment: By design execution state changes should be notified via `SchedulingStrategy #onExecutionStateChange`. Similarly, all partition consumable state changes should be notified via `SchedulingStrategy #onPartitionConsumable`, as discussed the in [another comment](https://github.com/apache/flink/pull/9663/files#r326540913) This is how we define this public interface and I think we need to guarantee its correctness eventually. And you are right that it is not a problem at the moment, or before we mark it as a public interface for users or other developers to customize the scheduling strategy. So I think we can skip it in this PR and do this in a follow up task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326894440 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { + + public static ExecutionVertexSchedulingRequirements from(final ExecutionVertex executionVertex) { + + final ExecutionVertexID executionVertexId = new ExecutionVertexID( Review comment: `ExecutionVertex` now caches its `ExecutionVertexID` and we can use it directly. I also find that some other scheduling/failover NG classes are still creating `ExecutionVertexID` by themselves. I think we can fix them in a hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326894495 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -0,0 +1,746 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326891379 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1485,6 +1525,20 @@ else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failu } } + public void failJob(Throwable cause) { + if (state == JobStatus.FAILING || state.isGloballyTerminalState()) { + return; + } + + transitionState(JobStatus.FAILING); + initFailureCause(cause); + + cancelVerticesAsync().whenComplete((aVoid, throwable) -> { + transitionState(JobStatus.FAILED); Review comment: How about using transitionState(state, JobStatus.FAILED, cause)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326892524 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID fai * @param t The exception that caused the failure. */ public void failGlobal(Throwable t) { + if (!isLegacyScheduling()) { + ExceptionUtils.rethrow(t); Review comment: Re-throwing this error may not work in some cases. `failGlobal` is used as a safety net to recover the job from unexpected inconsistent state. And in my mind this method is not intended to throw any exceptions and many invocations of it do not handle exceptions from it. One problem I can identify is that the `CheckpointFailureManager` would break with this change as it depends on failGlobal. Another possible issue is that all the errors happening in execution updates from TM previously lead to `failGlobal` to recover. Now the errors will cause the RPC to return an error response. Reworking all the possible (with DefaultScheduler) entrances to `failGlobal` still requires a way to handle the errors. Maybe we need to implement a `failGlobal` mechanism that works for DefaultScheduler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326888403 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326888403 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326886263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +/** + * Operations on the ExecutionVertex. + */ +interface ExecutionVertexOperations { + + void deploy(ExecutionVertex executionVertex) throws JobException; Review comment: A `ExecutionVertexID -> ExecutionVertex` function which references `SchedulerBase#getExecutionVertex` would be enough rather than the entire `ExecutionGraph`. But that requires the `DefaultScheduler` to be responsible for creating `DefaultExecutionVertexOperations`, which makes it harder to test the scheduler with `FailingExecutionVertexOperationsDecorator`. One solution I can come up with is to use factory to create **ExecutionVertexOperations**. I did a try in [this commit](https://github.com/zhuzhurk/flink/commit/21b09575fb5818c76313e6a73c8a826626e1bc0a). However the changes was more than I had supposed. So you can decide whether we should have such changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326886164 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { Review comment: > Checkstyle does not complain about missing Javadocs if the class is not public I see. I was not aware of that and thought it compiled because the style checks are suppressed for these runtime packages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326886164 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { Review comment: > Checkstyle does not complain about missing Javadocs if the class is not public I see. I was not aware of that and thought it compiled because the style checks are suppressed for these runtime packages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326644539 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -23,40 +23,93 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy; import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** - * Stub implementation of the future default scheduler. + * The future default scheduler. */ -public class DefaultScheduler extends LegacyScheduler { +public class DefaultScheduler extends SchedulerBase implements SchedulerOperations { + + private final Logger log; + + private final ClassLoader userCodeLoader; + + private final ExecutionSlotAllocator executionSlotAllocator; + + private final ExecutionFailureHandler executionFailureHandler; + + private final ScheduledExecutor delayExecutor; + + private final SchedulingStrategy schedulingStrategy; + + private final ExecutionVertexVersioner executionVertexVersioner; + + private final ExecutionVertexOperations executionVertexOperations; public DefaultScheduler( - final Logger log, Review comment: I think the benefit of double indentation is to avoid the param in function declaration mixed up with the function body which has single indentation. But as you did, the confusion can be reduced by adding an empty line between them. I don't have much preference for this style actually, so it's fine to not change it if double indentation is not explicitly required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326544032 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326540913 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326523150 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { + + public static ExecutionVertexSchedulingRequirements from(final ExecutionVertex executionVertex) { + + final ExecutionVertexID executionVertexId = new ExecutionVertexID( + executionVertex.getJobVertex().getJobVertexId(), + executionVertex.getParallelSubtaskIndex()); + + final AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation(); + final SlotSharingGroup slotSharingGroup = executionVertex.getJobVertex().getSlotSharingGroup(); + + return new ExecutionVertexSchedulingRequirements.Builder() + .withExecutionVertexId(executionVertexId) + .withPreviousAllocationId(latestPriorAllocation) Review comment: The resource profile is not set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326512026 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +class DeploymentHandle { + + private final ExecutionVertexVersion requiredVertexVersion; + + private final ExecutionVertexDeploymentOption executionVertexDeploymentOption; + + private final SlotExecutionVertexAssignment slotExecutionVertexAssignment; + + public DeploymentHandle( + final ExecutionVertexVersion requiredVertexVersion, Review comment: 2 indentations maybe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326511036 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326502860 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +class DeploymentHandle { Review comment: We should have java docs for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326495983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalTaskFailuresListener.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionState)} on task failure. + */ +class UpdateSchedulerNgOnInternalTaskFailuresListener implements InternallyDetectedTaskFailuresListener { + + private final SchedulerNG schedulerNg; + + private final JobID jobId; + + public UpdateSchedulerNgOnInternalTaskFailuresListener( + final SchedulerNG schedulerNg, Review comment: 2 indentations maybe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326511036 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // + // SchedulerNG + // + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction restartTasksOrHandleError(final Set executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326502137 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +/** + * Operations on the ExecutionVertex. + */ +interface ExecutionVertexOperations { + + void deploy(ExecutionVertex executionVertex) throws JobException; Review comment: ExecutionVertexID as the param should be a better for interfaces. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326495736 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -23,40 +23,93 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy; import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** - * Stub implementation of the future default scheduler. + * The future default scheduler. */ -public class DefaultScheduler extends LegacyScheduler { +public class DefaultScheduler extends SchedulerBase implements SchedulerOperations { + + private final Logger log; + + private final ClassLoader userCodeLoader; + + private final ExecutionSlotAllocator executionSlotAllocator; + + private final ExecutionFailureHandler executionFailureHandler; + + private final ScheduledExecutor delayExecutor; + + private final SchedulingStrategy schedulingStrategy; + + private final ExecutionVertexVersioner executionVertexVersioner; + + private final ExecutionVertexOperations executionVertexOperations; public DefaultScheduler( - final Logger log, Review comment: Should we have 2 indentations for params in function declaration, as suggested in the Flink [code style doc](https://flink.apache.org/contributing/code-style-and-quality-formatting.html)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326502296 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +class DefaultExecutionVertexOperations implements ExecutionVertexOperations { Review comment: We need a java doc for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326499609 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +final class ExecutionVertexSchedulingRequirementsMapper { Review comment: We need a java doc for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326482996 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +class SubmissionTrackingTaskManagerGateway extends SimpleAckingTaskManagerGateway { Review comment: Could you add a java doc for this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326485570 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +/** + * Restart backoff time strategy that never restarts. + */ +public enum NeverRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { + + INSTANCE; + + @Override + public boolean canRestart() { + return false; + } + + @Override + public long getBackoffTime() { + return 0; + } + + @Override + public void notifyFailure(final Throwable cause) { + Review comment: The empty line is not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326489325 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ## @@ -42,6 +42,23 @@ */ public interface SlotProvider { + /** +* Allocating slot with specific requirement. +* +* @param slotRequestId identifying the slot request +* @param scheduledUnit The task to allocate the slot for +* @param slotProfile profile of the requested slot +* @param allocationTimeout after which the allocation fails with a timeout exception +* @return The future of the allocation +*/ + default CompletableFuture allocateSlot( Review comment: Seems this interface is only used by tests. Do we really need it or can we make the tests to invoke the underlying interface directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326484256 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +class SubmissionTrackingTaskManagerGateway extends SimpleAckingTaskManagerGateway { + + private final BlockingQueue taskDeploymentDescriptors = new LinkedBlockingDeque<>(); + + public void setFailSubmission(final boolean failSubmission) { + this.failSubmission = failSubmission; + } + + private boolean failSubmission; Review comment: It's better to put the field definition in the head part of the class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326487486 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -44,13 +44,18 @@ /** Config name for the {@link RestartPipelinedRegionStrategy}. */ public static final String LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy"; + /** Config name for the {@link NoOpFailoverStrategy} */ + public static final String NO_OP_FAILOVER_STRATEGY = "noop"; + // /** * Loads a FailoverStrategy Factory from the given configuration. */ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) { - final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY); + final String strategyParam = config.getString(JobManagerOptions.SCHEDULER).equals("ng") ? + NO_OP_FAILOVER_STRATEGY : Review comment: How about a `ThrowingFailoverStrategy` here, just like `ThrowingRestartStrategy`? We do not expect the strategy to be used after the creation. `ThrowingFailoverStrategy` can help to found out potential issues. We should use `isLegacyScheduling()` check to surround the `failoverStrategy#notifyNewVertices` invocation in `ExecutionGraph#attachJobGraph` as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326483593 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +class SubmissionTrackingTaskManagerGateway extends SimpleAckingTaskManagerGateway { + + private final BlockingQueue taskDeploymentDescriptors = new LinkedBlockingDeque<>(); + + public void setFailSubmission(final boolean failSubmission) { Review comment: The method is never used. Seems we lack a test for submission failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r326241629 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -66,14 +77,30 @@ public SchedulerNG createInstance( jobMasterConfiguration, slotProvider, futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, rpcTimeout, blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, shuffleMaster, - partitionTracker); + partitionTracker, + schedulingStrategyFactory, + new RestartPipelinedRegionStrategy.Factory(), Review comment: I opened a ticket FLINK-14131 for this. We can do it after this PR is done. Add we need to consider whether to respect the existing "jobmanager.execution.failover-strategy" config then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r325636828 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1179,11 +1197,19 @@ private void sendPartitionInfos() { // Internal Actions // + private boolean isLegacyScheduling() { + return getVertex().isLegacyScheduling(); + } + private boolean processFail(Throwable t, boolean isCallback) { return processFail(t, isCallback, null, null, true); } private boolean processFail(Throwable t, boolean isCallback, Map> userAccumulators, IOMetrics metrics, boolean releasePartitions) { Review comment: This method is only invoked by `processFail(Throwable t, boolean isCallback)`. I think it's not needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r325634779 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1032,9 +1046,13 @@ void markFailed(Throwable t) { } void markFailed(Throwable t, Map> userAccumulators, IOMetrics metrics) { Review comment: This method is used by tests only. We may replace it or mark it with @VisibleForTesting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r325660299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1032,9 +1046,13 @@ void markFailed(Throwable t) { } void markFailed(Throwable t, Map> userAccumulators, IOMetrics metrics) { + markFailed(t, userAccumulators, metrics, false); + } + + void markFailed(Throwable t, Map> userAccumulators, IOMetrics metrics, boolean fromSchedulerNg) { // skip release of partitions since this is only called if the TM actually sent the FAILED state update Review comment: The comments seem to be outdated. With changes in this PR, the pre-assumption seems to break since the failure bay be triggered internally from JM. Though it may not be problematic since the errors originate from `processFail(Throwable t, boolean isCallback)` which will trigger the partition to be released earlier. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324658029 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -66,14 +77,30 @@ public SchedulerNG createInstance( jobMasterConfiguration, slotProvider, futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, rpcTimeout, blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, shuffleMaster, - partitionTracker); + partitionTracker, + schedulingStrategyFactory, + new RestartPipelinedRegionStrategy.Factory(), Review comment: And a FastRestartPipelinedRegionStrategy is introduced in FLINK-13056 as an option for faster failover processing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324593492 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -66,14 +77,30 @@ public SchedulerNG createInstance( jobMasterConfiguration, slotProvider, futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, rpcTimeout, blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, shuffleMaster, - partitionTracker); + partitionTracker, + schedulingStrategyFactory, + new RestartPipelinedRegionStrategy.Factory(), Review comment: @tillrohrmann an individual restart strategy is needed if we want to only restart the failed task for recovery. Ot maybe just provide a pluggable way so that developer can do it out of flink core. Here's the [ML](https://lists.apache.org/thread.html/237de1061283a18dd813043a84d5d1debb15d1220368fa2f5b801025@%3Cdev.flink.apache.org%3E) for individual failover discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324593492 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -66,14 +77,30 @@ public SchedulerNG createInstance( jobMasterConfiguration, slotProvider, futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, rpcTimeout, blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, shuffleMaster, - partitionTracker); + partitionTracker, + schedulingStrategyFactory, + new RestartPipelinedRegionStrategy.Factory(), Review comment: @tillrohrmann an individual restart strategy is needed if we want to only restart the failed task for recovery. Or maybe just provide a pluggable way so that developer can do it out of flink core. Here's the [ML](https://lists.apache.org/thread.html/237de1061283a18dd813043a84d5d1debb15d1220368fa2f5b801025@%3Cdev.flink.apache.org%3E) for individual failover discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324528408 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy); + this.slotRequestTimeout = checkNotNull(slotRequestTimeout); + this.slotProvider = checkNotNull(slotProvider); + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); } + // + // SchedulerNG + // + @Override - public void startScheduling() { - throw new UnsupportedOperationException(); + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); Review comment: Seems the schedulingStrategy is not aware of internal state changes in this way, like DEPLOYING or SCHEDULED, and can only trigger scheduling upon the external state changes, like RUNNING, FINISHED, etc. Some state changes like CANCELED or FAILED might be notified when it's from task manager and not notified from scheduler internal errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324526676 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1211,7 +1231,10 @@ private boolean processFail(Throwable t, boolean isCallback, Maphttps://github.com/apache/flink/pull/9663#pullrequestreview-288377060). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324476406 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy); + this.slotRequestTimeout = checkNotNull(slotRequestTimeout); + this.slotProvider = checkNotNull(slotProvider); + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); } + // + // SchedulerNG + // + @Override - public void startScheduling() { - throw new UnsupportedOperationException(); + public void startSchedulingInternal() { Review comment: No need to be public. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324505819 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -0,0 +1,727 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TaskFailureListener; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; +import
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324502833 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -66,14 +77,30 @@ public SchedulerNG createInstance( jobMasterConfiguration, slotProvider, futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, rpcTimeout, blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, shuffleMaster, - partitionTracker); + partitionTracker, + schedulingStrategyFactory, + new RestartPipelinedRegionStrategy.Factory(), Review comment: It may be a bit out of scope for this PR, but I think we may need a failover strategy factory loader to support multiple failover strategies. And I'm also thinking to migrate the restart all strategy. Restart all strategy may be useful we users want to job vertices to be all running or no one running in some cases. Besides, if we'd like to support single vertex restarting strategy in the future, we may need an individual restart strategy, or just to support pluggable failover strategy so that there is a way to do so. This is out of the scope in this PR anyhow. I can work on it if we have agreement to have it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324477116 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy); + this.slotRequestTimeout = checkNotNull(slotRequestTimeout); + this.slotProvider = checkNotNull(slotProvider); + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); } + // + // SchedulerNG + // + @Override - public void startScheduling() { - throw new UnsupportedOperationException(); + public void startSchedulingInternal() { + initializeScheduling(); + schedulingStrategy.startScheduling(); + } + + private void initializeScheduling() { + executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateTaskExecutionStateInDefaultSchedulerListener(this, getJobGraph().getJobID())); + prepareExecutionGraphForScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( Review comment: This makes the delay be "time span between task failure happening and restarting tasks". Previously it's "time span between completing affected tasks
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r324505814 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -0,0 +1,727 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TaskFailureListener; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; +import
[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323755152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -23,11 +23,19 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.failover.flip1.NeverRestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; Review comment: Agreed. I think we shall rename them when removing the legacy restart strategies and failover strategies in 1.11(maybe). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services