[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-25 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327972417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java
 ##
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Restart backoff time strategy that never restarts.
+ */
+public enum NeverRestartBackoffTimeStrategy implements 
RestartBackoffTimeStrategy {
 
 Review comment:
   PR#8912 also introduces a NoRestartBackoffTimeStrategy.
   I think we can drop this class once PR#8912 is merged. And the restart 
strategy loading should be updated then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-25 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327982708
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This class is a tuple holding the information necessary to deploy an {@link 
ExecutionVertex}.
+ *
+ * The tuple consists of:
+ * 
+ * {@link ExecutionVertexVersion}
+ * {@link ExecutionVertexDeploymentOption}
+ * {@link SlotExecutionVertexAssignment}
+ * 
+ */
+class DeploymentHandle {
+
+   private final ExecutionVertexVersion requiredVertexVersion;
+
+   private final ExecutionVertexDeploymentOption 
executionVertexDeploymentOption;
+
+   private final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment;
+
+   public DeploymentHandle(
+   final ExecutionVertexVersion requiredVertexVersion,
+   final ExecutionVertexDeploymentOption 
executionVertexDeploymentOption,
+   final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment) {
+
+   this.requiredVertexVersion = 
Preconditions.checkNotNull(requiredVertexVersion);
+   this.executionVertexDeploymentOption = 
Preconditions.checkNotNull(executionVertexDeploymentOption);
+   this.slotExecutionVertexAssignment = 
Preconditions.checkNotNull(slotExecutionVertexAssignment);
+   }
+
+   public ExecutionVertexID getExecutionVertexId() {
+   return requiredVertexVersion.getExecutionVertexId();
+   }
+
+   public ExecutionVertexVersion getRequiredVertexVersion() {
+   return requiredVertexVersion;
+   }
+
+   public DeploymentOption getDeploymentOption() {
+   return executionVertexDeploymentOption.getDeploymentOption();
+   }
+
+   public SlotExecutionVertexAssignment getSlotExecutionVertexAssignment() 
{
+   return slotExecutionVertexAssignment;
+   }
+
+   public LogicalSlot getLogicalSlot() {
+   final LogicalSlot logicalSlot = 
slotExecutionVertexAssignment.getLogicalSlotFuture().getNow(null);
 
 Review comment:
   This method is currently called in DefaultScheduler#stopDeployment.
   There is possibility that the future is completed exceptionally, e.g. when 
slot allocation timed out. And it may cause fatal error then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-25 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327972417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java
 ##
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Restart backoff time strategy that never restarts.
+ */
+public enum NeverRestartBackoffTimeStrategy implements 
RestartBackoffTimeStrategy {
 
 Review comment:
   PR#8912 also introduces a NoRestartBackoffTimeStrategy.
   We may drop it once PR#8912 is merged. And the restart strategy loading 
should be updated then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-25 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327979650
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,292 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
 
 Review comment:
   It would be helpful to log what scheduling strategy is used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-24 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327709276
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable 
cause, ExecutionAttemptID fai
 * @param t The exception that caused the failure.
 */
public void failGlobal(Throwable t) {
+   if (!isLegacyScheduling()) {
+   ExceptionUtils.rethrow(t);
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-24 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327494085
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-24 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327446705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+   this.slotRequestTimeout = slotRequestTimeout;
+   this.slotProvider = slotProvider;
+   this.delayExecutor = delayExecutor;
+   this.userCodeLoader = userCodeLoader;
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = executionVertexVersioner;
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   I think it's fine to not change it at the moment, given that 
   1. At the moment, the schedulingStrategy.onExecutionStateChange() is only 
notified for FAILED state internally and RUNNING/FINISHED/CANCELED/FAILED state 
from TM side 
   2. RUNNING/FAILED/CANCELED/FAILED state do not tigger scheduling in the 
eager/lazy scheduling strategies
   3. FINISHED state is currently used for lazy scheduling but it may be 
changed as discussed [in this 
comment](https://github.com/apache/flink/pull/9663#discussion_r326540913) 
   
   We can reconsider this when later we are to redefine the scheduling strategy 
interfaces or refine the invocations as discussed in [the other 
comment](https://github.com/apache/flink/pull/9663/files#r326897974).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327421441
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable 
cause, ExecutionAttemptID fai
 * @param t The exception that caused the failure.
 */
public void failGlobal(Throwable t) {
+   if (!isLegacyScheduling()) {
+   ExceptionUtils.rethrow(t);
 
 Review comment:
   >  For real consistency issues caused by bugs, illegal state transitions, 
etc. I think it is reasonable to terminate the JVM.
   
   I think it's reasonable but I'm not sure whether this change can cause much 
more JM failovers than it used to be. This could be an even severe problem for 
session cluster which holds multiple jobs in a Dispatcher.
   
   > One could propagate the exception to the JobMaster and in the RPC 
framework check for unwanted uncaught exceptions in specially annotated RPC 
methods.
   
   If we'd like to terminate the JVM, why not do it right in the place the 
fatal error happens?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327418704
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327410288
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327150696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Operations on the ExecutionVertex.
+ */
+interface ExecutionVertexOperations {
+
+   void deploy(ExecutionVertex executionVertex) throws JobException;
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327147478
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1485,6 +1525,20 @@ else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failu
}
}
 
+   public void failJob(Throwable cause) {
+   if (state == JobStatus.FAILING || 
state.isGloballyTerminalState()) {
+   return;
+   }
+
+   transitionState(JobStatus.FAILING);
+   initFailureCause(cause);
+
+   cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
+   transitionState(JobStatus.FAILED);
 
 Review comment:
   Oh I see. I think we can do that for transition to `FAILING` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327089390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327089390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327083068
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -23,40 +23,93 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Stub implementation of the future default scheduler.
+ * The future default scheduler.
  */
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
+
+   private final Logger log;
+
+   private final ClassLoader userCodeLoader;
+
+   private final ExecutionSlotAllocator executionSlotAllocator;
+
+   private final ExecutionFailureHandler executionFailureHandler;
+
+   private final ScheduledExecutor delayExecutor;
+
+   private final SchedulingStrategy schedulingStrategy;
+
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   private final ExecutionVertexOperations executionVertexOperations;
 
public DefaultScheduler(
-   final Logger log,
 
 Review comment:
   Fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327081463
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -44,13 +44,18 @@
/** Config name for the {@link RestartPipelinedRegionStrategy}. */
public static final String 
LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy";
 
+   /** Config name for the {@link NoOpFailoverStrategy} */
+   public static final String NO_OP_FAILOVER_STRATEGY = "noop";
+
// 

 
/**
 * Loads a FailoverStrategy Factory from the given configuration.
 */
public static FailoverStrategy.Factory 
loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
-   final String strategyParam = 
config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
+   final String strategyParam = 
config.getString(JobManagerOptions.SCHEDULER).equals("ng") ?
+   NO_OP_FAILOVER_STRATEGY :
 
 Review comment:
   I see.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327079025
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1485,6 +1525,20 @@ else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failu
}
}
 
+   public void failJob(Throwable cause) {
+   if (state == JobStatus.FAILING || 
state.isGloballyTerminalState()) {
+   return;
+   }
+
+   transitionState(JobStatus.FAILING);
+   initFailureCause(cause);
+
+   cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
+   transitionState(JobStatus.FAILED);
 
 Review comment:
   The transition log will contain the cause if there is any.
   Without the cause seems we do not log the information that why the job is 
FAILING/FAILED?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327076866
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327076866
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327067464
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
 
 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326982305
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+   this.slotRequestTimeout = slotRequestTimeout;
+   this.slotProvider = slotProvider;
+   this.delayExecutor = delayExecutor;
+   this.userCodeLoader = userCodeLoader;
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = executionVertexVersioner;
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   Exactly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-23 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326977860
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
+   executionVertex.getJobVertex().getJobVertexId(),
+   executionVertex.getParallelSubtaskIndex());
+
+   final AllocationID latestPriorAllocation = 
executionVertex.getLatestPriorAllocation();
+   final SlotSharingGroup slotSharingGroup = 
executionVertex.getJobVertex().getSlotSharingGroup();
+
+   return new ExecutionVertexSchedulingRequirements.Builder()
+   .withExecutionVertexId(executionVertexId)
+   .withPreviousAllocationId(latestPriorAllocation)
+   .withSlotSharingGroupId(slotSharingGroup == null ? null 
: slotSharingGroup.getSlotSharingGroupId())
+   
.withCoLocationConstraint(executionVertex.getLocationConstraint())
+   
.withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build();
 
 Review comment:
   I see. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326899233
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+   this.slotRequestTimeout = slotRequestTimeout;
+   this.slotProvider = slotProvider;
+   this.delayExecutor = delayExecutor;
+   this.userCodeLoader = userCodeLoader;
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = executionVertexVersioner;
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   I think the potential that `schedulingStrategy.onExecutionStateChange` 
changes task state directly in this thread is not good:
* It can cause call stack chain that is hard to imagine, which makes it 
hard to maintain
* a very long call stack chain may result in stack overflow
* when we invoke `maybeHandleTaskFailure` right after invoking 
`schedulingStrategy.onExecutionStateChange`, the task state may even have 
changed in the call stack chain so that we are doing failover handling in an 
unexpected state
   
   How about to define that `SchedulerOperations#allocateSlotsAndDeploy` does 
not take effect in the direct invoking? And then change the actions in 
`allocateSlotsAndDeploy` to be executed in the main thread.
   In this way we can have the assumption that no task state change happens 
when invoking `schedulingStrategy.onExecutionStateChange` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326901044
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
+   executionVertex.getJobVertex().getJobVertexId(),
+   executionVertex.getParallelSubtaskIndex());
+
+   final AllocationID latestPriorAllocation = 
executionVertex.getLatestPriorAllocation();
+   final SlotSharingGroup slotSharingGroup = 
executionVertex.getJobVertex().getSlotSharingGroup();
+
+   return new ExecutionVertexSchedulingRequirements.Builder()
+   .withExecutionVertexId(executionVertexId)
+   .withPreviousAllocationId(latestPriorAllocation)
+   .withSlotSharingGroupId(slotSharingGroup == null ? null 
: slotSharingGroup.getSlotSharingGroupId())
+   
.withCoLocationConstraint(executionVertex.getLocationConstraint())
+   
.withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build();
 
 Review comment:
   In the legacy scheduler, the preference locations from 
`getPreferredLocationsBasedOnInputs` is used if 
`getPreferredLocationsBasedOnState` returns null.
   Are we deprecating `location preferences based on inputs` intentionally?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326897974
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+   this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+   this.slotProvider = checkNotNull(slotProvider);
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
}
 
+   // 

+   // SchedulerNG
+   // 

+
@Override
-   public void startScheduling() {
-   throw new UnsupportedOperationException();
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
 
 Review comment:
   By design execution state changes should be notified via `SchedulingStrategy 
#onExecutionStateChange`.
   Similarly, all partition consumable state changes should be notified via 
`SchedulingStrategy #onPartitionConsumable`, as discussed the in [another 
comment](https://github.com/apache/flink/pull/9663/files#r326540913)
   This is how we define this public interface and I think we need to guarantee 
its correctness eventually.
   
   And you are right that it is not a problem at the moment, or before we mark 
it as a public interface for users or other developers to customize the 
scheduling strategy. 
   So I think we can skip it in this PR and do this in a follow up task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326894440
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
 
 Review comment:
   `ExecutionVertex` now caches its `ExecutionVertexID` and we can use it 
directly.
   
   I also find that some other scheduling/failover NG classes are still 
creating `ExecutionVertexID` by themselves. I think we can fix them in a hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326894495
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326891379
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1485,6 +1525,20 @@ else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failu
}
}
 
+   public void failJob(Throwable cause) {
+   if (state == JobStatus.FAILING || 
state.isGloballyTerminalState()) {
+   return;
+   }
+
+   transitionState(JobStatus.FAILING);
+   initFailureCause(cause);
+
+   cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
+   transitionState(JobStatus.FAILED);
 
 Review comment:
   How about using transitionState(state, JobStatus.FAILED, cause)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326892524
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1129,6 +1157,9 @@ void failGlobalIfExecutionIsStillRunning(Throwable 
cause, ExecutionAttemptID fai
 * @param t The exception that caused the failure.
 */
public void failGlobal(Throwable t) {
+   if (!isLegacyScheduling()) {
+   ExceptionUtils.rethrow(t);
 
 Review comment:
   Re-throwing this error may not work in some cases.
   `failGlobal` is used as a safety net to recover the job from unexpected 
inconsistent state. And in my mind this method is not intended to throw any 
exceptions and many invocations of it do not handle exceptions from it.
   
   One problem I can identify is that the `CheckpointFailureManager` would 
break with this change as it depends on failGlobal.
   Another possible issue is that all the errors happening in execution updates 
from TM previously lead to `failGlobal` to recover. Now the errors will cause 
the RPC to return an error response.
   
   Reworking all the possible (with DefaultScheduler) entrances to `failGlobal` 
still requires a way to handle the errors.
   Maybe we need to implement a `failGlobal` mechanism that works for 
DefaultScheduler.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326888403
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326888403
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326886263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Operations on the ExecutionVertex.
+ */
+interface ExecutionVertexOperations {
+
+   void deploy(ExecutionVertex executionVertex) throws JobException;
 
 Review comment:
   A `ExecutionVertexID -> ExecutionVertex` function which references 
`SchedulerBase#getExecutionVertex` would be enough rather than the entire 
`ExecutionGraph`. 
   But that requires the `DefaultScheduler` to be responsible for creating 
`DefaultExecutionVertexOperations`, which makes it harder to test the scheduler 
with `FailingExecutionVertexOperationsDecorator`.
   
   One solution I can come up with is to use factory to create 
**ExecutionVertexOperations**. I did a try in [this 
commit](https://github.com/zhuzhurk/flink/commit/21b09575fb5818c76313e6a73c8a826626e1bc0a).
   However the changes was more than I had supposed. So you can decide whether 
we should have such changes.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326886164
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
 
 Review comment:
   > Checkstyle does not complain about missing Javadocs if the class is not 
public
   
   I see. I was not aware of that and thought it compiled because the style 
checks are suppressed for these runtime packages. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-22 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326886164
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
 
 Review comment:
   > Checkstyle does not complain about missing Javadocs if the class is not 
public
   I see. I was not aware of that and thought it compiled because the style 
checks are suppressed for these runtime packages. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326644539
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -23,40 +23,93 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Stub implementation of the future default scheduler.
+ * The future default scheduler.
  */
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
+
+   private final Logger log;
+
+   private final ClassLoader userCodeLoader;
+
+   private final ExecutionSlotAllocator executionSlotAllocator;
+
+   private final ExecutionFailureHandler executionFailureHandler;
+
+   private final ScheduledExecutor delayExecutor;
+
+   private final SchedulingStrategy schedulingStrategy;
+
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   private final ExecutionVertexOperations executionVertexOperations;
 
public DefaultScheduler(
-   final Logger log,
 
 Review comment:
   I think the benefit of double indentation is to avoid the param in function 
declaration mixed up with the function body which has single indentation. But 
as you did, the confusion can be reduced by adding an empty line between them.
   
   I don't have much preference for this style actually, so it's fine to not 
change it if double indentation is not explicitly required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326544032
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326540913
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326523150
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+   public static ExecutionVertexSchedulingRequirements from(final 
ExecutionVertex executionVertex) {
+
+   final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(
+   executionVertex.getJobVertex().getJobVertexId(),
+   executionVertex.getParallelSubtaskIndex());
+
+   final AllocationID latestPriorAllocation = 
executionVertex.getLatestPriorAllocation();
+   final SlotSharingGroup slotSharingGroup = 
executionVertex.getJobVertex().getSlotSharingGroup();
+
+   return new ExecutionVertexSchedulingRequirements.Builder()
+   .withExecutionVertexId(executionVertexId)
+   .withPreviousAllocationId(latestPriorAllocation)
 
 Review comment:
   The resource profile is not set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326512026
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+class DeploymentHandle {
+
+   private final ExecutionVertexVersion requiredVertexVersion;
+
+   private final ExecutionVertexDeploymentOption 
executionVertexDeploymentOption;
+
+   private final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment;
+
+   public DeploymentHandle(
+   final ExecutionVertexVersion requiredVertexVersion,
 
 Review comment:
   2 indentations maybe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326511036
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326502860
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+class DeploymentHandle {
 
 Review comment:
   We should have java docs for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326495983
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalTaskFailuresListener.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionState)} on 
task failure.
+ */
+class UpdateSchedulerNgOnInternalTaskFailuresListener implements 
InternallyDetectedTaskFailuresListener {
+
+   private final SchedulerNG schedulerNg;
+
+   private final JobID jobId;
+
+   public UpdateSchedulerNgOnInternalTaskFailuresListener(
+   final SchedulerNG schedulerNg,
 
 Review comment:
   2 indentations maybe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326511036
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +128,293 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.log = log;
+
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+
+   this.executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   this.schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   this.executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateSchedulerNgOnInternalTaskFailuresListener(this, 
getJobGraph().getJobID()));
+   }
+
+   // 

+   // SchedulerNG
+   // 

+
+   @Override
+   protected void startSchedulingInternal() {
+   prepareExecutionGraphForScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
+   () -> FutureUtils.assertNoException(
+   
cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), 
getMainThreadExecutor())),
+   failureHandlingResult.getRestartDelayMS(),
+   TimeUnit.MILLISECONDS);
+   }
+
+   private BiFunction 
restartTasksOrHandleError(final Set 
executionVertexVersions) {
+   return (Object ignored, Throwable throwable) -> {
+
+   if (throwable == null) {
+   final Set verticesToRestart 
= 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+   
schedulingStrategy.restartTasks(verticesToRestart);
+   } else {
+   failJob(throwable);
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326502137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Operations on the ExecutionVertex.
+ */
+interface ExecutionVertexOperations {
+
+   void deploy(ExecutionVertex executionVertex) throws JobException;
 
 Review comment:
   ExecutionVertexID as the param should be a better for interfaces.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326495736
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -23,40 +23,93 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Stub implementation of the future default scheduler.
+ * The future default scheduler.
  */
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
+
+   private final Logger log;
+
+   private final ClassLoader userCodeLoader;
+
+   private final ExecutionSlotAllocator executionSlotAllocator;
+
+   private final ExecutionFailureHandler executionFailureHandler;
+
+   private final ScheduledExecutor delayExecutor;
+
+   private final SchedulingStrategy schedulingStrategy;
+
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   private final ExecutionVertexOperations executionVertexOperations;
 
public DefaultScheduler(
-   final Logger log,
 
 Review comment:
   Should we have 2 indentations for params in function declaration, as 
suggested in the Flink [code style 
doc](https://flink.apache.org/contributing/code-style-and-quality-formatting.html)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326502296
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
 
 Review comment:
   We need a java doc for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326499609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
 
 Review comment:
   We need a java doc for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326482996
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+class SubmissionTrackingTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
 
 Review comment:
   Could you add a java doc for this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326485570
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/NeverRestartBackoffTimeStrategy.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Restart backoff time strategy that never restarts.
+ */
+public enum NeverRestartBackoffTimeStrategy implements 
RestartBackoffTimeStrategy {
+
+   INSTANCE;
+
+   @Override
+   public boolean canRestart() {
+   return false;
+   }
+
+   @Override
+   public long getBackoffTime() {
+   return 0;
+   }
+
+   @Override
+   public void notifyFailure(final Throwable cause) {
+
 
 Review comment:
   The empty line is not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326489325
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 ##
 @@ -42,6 +42,23 @@
  */
 public interface SlotProvider {
 
+   /**
+* Allocating slot with specific requirement.
+*
+* @param slotRequestId identifying the slot request
+* @param scheduledUnit The task to allocate the slot for
+* @param slotProfile profile of the requested slot
+* @param allocationTimeout after which the allocation fails with a 
timeout exception
+* @return The future of the allocation
+*/
+   default CompletableFuture allocateSlot(
 
 Review comment:
   Seems this interface is only used by tests. Do we really need it or can we 
make the tests to invoke the underlying interface directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326484256
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+class SubmissionTrackingTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
+
+   private final BlockingQueue 
taskDeploymentDescriptors = new LinkedBlockingDeque<>();
+
+   public void setFailSubmission(final boolean failSubmission) {
+   this.failSubmission = failSubmission;
+   }
+
+   private boolean failSubmission;
 
 Review comment:
   It's better to put the field definition in the head part of the class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326487486
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -44,13 +44,18 @@
/** Config name for the {@link RestartPipelinedRegionStrategy}. */
public static final String 
LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy";
 
+   /** Config name for the {@link NoOpFailoverStrategy} */
+   public static final String NO_OP_FAILOVER_STRATEGY = "noop";
+
// 

 
/**
 * Loads a FailoverStrategy Factory from the given configuration.
 */
public static FailoverStrategy.Factory 
loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
-   final String strategyParam = 
config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
+   final String strategyParam = 
config.getString(JobManagerOptions.SCHEDULER).equals("ng") ?
+   NO_OP_FAILOVER_STRATEGY :
 
 Review comment:
   How about a `ThrowingFailoverStrategy` here, just like 
`ThrowingRestartStrategy`? We do not expect the strategy to be used after the 
creation. `ThrowingFailoverStrategy` can help to found out potential issues.
   
   We should use `isLegacyScheduling()` check to surround the 
`failoverStrategy#notifyNewVertices` invocation in 
`ExecutionGraph#attachJobGraph` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326483593
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+class SubmissionTrackingTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
+
+   private final BlockingQueue 
taskDeploymentDescriptors = new LinkedBlockingDeque<>();
+
+   public void setFailSubmission(final boolean failSubmission) {
 
 Review comment:
   The method is never used. Seems we lack a test for submission failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-19 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326241629
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -66,14 +77,30 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
slotProvider,
futureExecutor,
+   new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
-   partitionTracker);
+   partitionTracker,
+   schedulingStrategyFactory,
+   new RestartPipelinedRegionStrategy.Factory(),
 
 Review comment:
   I opened a ticket FLINK-14131 for this. We can do it after this PR is done.
   
   Add we need to consider whether to respect the existing 
"jobmanager.execution.failover-strategy" config then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-18 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r325636828
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1179,11 +1197,19 @@ private void sendPartitionInfos() {
//  Internal Actions
// 

 
+   private boolean isLegacyScheduling() {
+   return getVertex().isLegacyScheduling();
+   }
+
private boolean processFail(Throwable t, boolean isCallback) {
return processFail(t, isCallback, null, null, true);
}
 
private boolean processFail(Throwable t, boolean isCallback, 
Map> userAccumulators, IOMetrics metrics, boolean 
releasePartitions) {
 
 Review comment:
   This method is only invoked by `processFail(Throwable t, boolean 
isCallback)`.
   I think it's not needed anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-18 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r325634779
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1032,9 +1046,13 @@ void markFailed(Throwable t) {
}
 
void markFailed(Throwable t, Map> 
userAccumulators, IOMetrics metrics) {
 
 Review comment:
   This method is used by tests only. We may replace it or mark it with 
@VisibleForTesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-18 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r325660299
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1032,9 +1046,13 @@ void markFailed(Throwable t) {
}
 
void markFailed(Throwable t, Map> 
userAccumulators, IOMetrics metrics) {
+   markFailed(t, userAccumulators, metrics, false);
+   }
+
+   void markFailed(Throwable t, Map> 
userAccumulators, IOMetrics metrics, boolean fromSchedulerNg) {
// skip release of partitions since this is only called if the 
TM actually sent the FAILED state update
 
 Review comment:
   The comments seem to be outdated.
   With changes in this PR, the pre-assumption seems to break since the failure 
bay be triggered internally from JM. Though it may not be problematic since the 
errors originate from `processFail(Throwable t, boolean isCallback)` which will 
trigger the partition to be released earlier.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324658029
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -66,14 +77,30 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
slotProvider,
futureExecutor,
+   new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
-   partitionTracker);
+   partitionTracker,
+   schedulingStrategyFactory,
+   new RestartPipelinedRegionStrategy.Factory(),
 
 Review comment:
   And a FastRestartPipelinedRegionStrategy is introduced in FLINK-13056 as an 
option for faster failover processing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324593492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -66,14 +77,30 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
slotProvider,
futureExecutor,
+   new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
-   partitionTracker);
+   partitionTracker,
+   schedulingStrategyFactory,
+   new RestartPipelinedRegionStrategy.Factory(),
 
 Review comment:
   @tillrohrmann an individual restart strategy is needed if we want to only 
restart the failed task for recovery. Ot maybe just provide a pluggable way so 
that developer can do it out of flink core. Here's the 
[ML](https://lists.apache.org/thread.html/237de1061283a18dd813043a84d5d1debb15d1220368fa2f5b801025@%3Cdev.flink.apache.org%3E)
 for individual failover discussion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324593492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -66,14 +77,30 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
slotProvider,
futureExecutor,
+   new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
-   partitionTracker);
+   partitionTracker,
+   schedulingStrategyFactory,
+   new RestartPipelinedRegionStrategy.Factory(),
 
 Review comment:
   @tillrohrmann an individual restart strategy is needed if we want to only 
restart the failed task for recovery. Or maybe just provide a pluggable way so 
that developer can do it out of flink core. Here's the 
[ML](https://lists.apache.org/thread.html/237de1061283a18dd813043a84d5d1debb15d1220368fa2f5b801025@%3Cdev.flink.apache.org%3E)
 for individual failover discussion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324528408
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+   this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+   this.slotProvider = checkNotNull(slotProvider);
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
}
 
+   // 

+   // SchedulerNG
+   // 

+
@Override
-   public void startScheduling() {
-   throw new UnsupportedOperationException();
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
 
 Review comment:
   Seems the schedulingStrategy is not aware of internal state changes in this 
way, like DEPLOYING or SCHEDULED, and can only trigger scheduling upon the 
external state changes, like RUNNING, FINISHED, etc. Some state changes like 
CANCELED or FAILED might be notified when it's from task manager and not 
notified from scheduler internal errors.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324526676
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1211,7 +1231,10 @@ private boolean processFail(Throwable t, boolean 
isCallback, Maphttps://github.com/apache/flink/pull/9663#pullrequestreview-288377060).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324476406
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+   this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+   this.slotProvider = checkNotNull(slotProvider);
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
}
 
+   // 

+   // SchedulerNG
+   // 

+
@Override
-   public void startScheduling() {
-   throw new UnsupportedOperationException();
+   public void startSchedulingInternal() {
 
 Review comment:
   No need to be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324505819
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskFailureListener;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324502833
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -66,14 +77,30 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
slotProvider,
futureExecutor,
+   new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
-   partitionTracker);
+   partitionTracker,
+   schedulingStrategyFactory,
+   new RestartPipelinedRegionStrategy.Factory(),
 
 Review comment:
   It may be a bit out of scope for this PR, but I think we may need a failover 
strategy factory loader to support multiple failover strategies.
   
   And I'm also thinking to migrate the restart all strategy.
   Restart all strategy may be useful we users want to job vertices to be all 
running or no one running in some cases.
   Besides, if we'd like to support single vertex restarting strategy in the 
future, we may need an individual restart strategy, or just to support 
pluggable failover strategy so that there is a way to do so.
   
   This is out of the scope in this PR anyhow. I can work on it if we have 
agreement to have it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324477116
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+   this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+   this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+   this.slotProvider = checkNotNull(slotProvider);
+   this.delayExecutor = checkNotNull(delayExecutor);
+   this.userCodeLoader = checkNotNull(userCodeLoader);
+   this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+   this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+   this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+   this.executionVertexVersioner = 
checkNotNull(executionVertexVersioner);
+   this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
}
 
+   // 

+   // SchedulerNG
+   // 

+
@Override
-   public void startScheduling() {
-   throw new UnsupportedOperationException();
+   public void startSchedulingInternal() {
+   initializeScheduling();
+   schedulingStrategy.startScheduling();
+   }
+
+   private void initializeScheduling() {
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+   schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+   executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+   setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+   prepareExecutionGraphForScheduling();
+   }
+
+   @Override
+   public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+   final Optional executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+   if (executionVertexIdOptional.isPresent()) {
+   final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+   updateState(taskExecutionState);
+   
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+   maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
+   return true;
+   }
+
+   return false;
+   }
+
+   private void maybeHandleTaskFailure(final TaskExecutionState 
taskExecutionState, final ExecutionVertexID executionVertexId) {
+   if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+   final Throwable error = 
taskExecutionState.getError(userCodeLoader);
+   handleTaskFailure(executionVertexId, error);
+   }
+   }
+
+   private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, final Throwable error) {
+   final FailureHandlingResult failureHandlingResult = 
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+   maybeRestartTasks(failureHandlingResult);
+   }
+
+   private void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+   if (failureHandlingResult.canRestart()) {
+   restartTasksWithDelay(failureHandlingResult);
+   } else {
+   failJob(failureHandlingResult.getError());
+   }
+   }
+
+   private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
+   final Set verticesToRestart = 
failureHandlingResult.getVerticesToRestart();
+
+   final Set executionVertexVersions =
+   new 
HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+   final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
+
+   delayExecutor.schedule(
 
 Review comment:
   This makes the delay be "time span between task failure happening and 
restarting tasks".
   Previously it's "time span between completing affected tasks 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-16 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324505814
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskFailureListener;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import 

[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-12 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r323755152
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -23,11 +23,19 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NeverRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
 
 Review comment:
   Agreed. I think we shall rename them when removing the legacy restart 
strategies and failover strategies in 1.11(maybe).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services