[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5487


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170179361
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
--- End diff --

ok


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170179289
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescaleParallelismQueryParameter extends 
MessageQueryParameter {
+
+   public static final String KEY = "parallelism";
+
+   protected RescaleParallelismQueryParameter() {
+   super(KEY, MessageParameterRequisiteness.MANDATORY);
+   }
+
+   @Override
+   public Integer convertValueFromString(String value) {
+   return Integer.valueOf(value);
--- End diff --

ok


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170179272
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
+   timeout);
+
+   final CompletableFuture executionGraphFuture = 
savepointFuture
+   .thenApplyAsync(
+   (String savepointPath) -> {
+   try {
+   
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+   savepointPath,
+   false,
+   
newExecutionGraph.getAllVertices(),
+   userCodeLoader);
+   } catch (Exception e) {

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170016414
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message headers for the {@link 
RescalingHandlers.RescalingStatusHandler}.
+ */
+public class RescalingStatusMessageParameters extends JobMessageParameters 
{
+
+   public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+   @Override
+   public Collection getPathParameters() {
+   return Arrays.asList(jobPathParameter, triggerIdPathParameter);
--- End diff --

True, there will be other places to fix as well. I'll leave it for a follow 
up task.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170015457
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescaleParallelismQueryParameter extends 
MessageQueryParameter {
+
+   public static final String KEY = "parallelism";
+
+   protected RescaleParallelismQueryParameter() {
+   super(KEY, MessageParameterRequisiteness.MANDATORY);
+   }
+
+   @Override
+   public Integer convertValueFromString(String value) {
+   return Integer.valueOf(value);
--- End diff --

Will add it to the `JobMaster`.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170014907
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
+   timeout);
+
+   final CompletableFuture executionGraphFuture = 
savepointFuture
+   .thenApplyAsync(
+   (String savepointPath) -> {
+   try {
+   
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+   savepointPath,
+   false,
+   
newExecutionGraph.getAllVertices(),
+   userCodeLoader);
+   } catch (Exception 

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170014666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception 
cause) {
return Acknowledge.get();
}
 
+   /**
+* Schedules the execution of the current {@link ExecutionGraph}.
+*/
+   private void scheduleExecutionGraph() {
+   try {
+   executionGraph.scheduleForExecution();
+   }
+   catch (Throwable t) {
+   executionGraph.failGlobal(t);
+   }
+   }
+
+   /**
+* Dispose the savepoint stored under the given path.
+*
+* @param savepointPath path where the savepoint is stored
+*/
+   private void disposeSavepoint(String savepointPath) {
+   try {
+   // delete the temporary savepoint
+   Checkpoints.disposeSavepoint(
+   savepointPath,
+   jobMasterConfiguration.getConfiguration(),
+   userCodeLoader,
+   log);
+   } catch (FlinkException | IOException de) {
--- End diff --

is a typo. Will change it.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170014483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
--- End diff --

Yes it will be necessary, once we do rescaling for individual operators. 
This will be the case with the rescaling policies.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170014030
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param  type of the first argument
+ * @param  type of the second argument
+ * @param  type of the thrown exception
+ */
+@FunctionalInterface
+public interface BiConsumerWithException 
extends BiConsumer {
+
+   /**
+* Performs this operation on the given arguments.
+*
+* @param t the first input argument
+* @param u the second input argument
+* @throws E in case of an error
+*/
+   void acceptWithException(T t, U u) throws E;
+
+   @Override
+   default void accept(T t, U u) {
+   try {
+   acceptWithException(t, u);
+   } catch (Throwable e) {
--- End diff --

True, will change it to `ExceptionUtils.rethrow`.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r170013389
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+/**
+ * Definition of the rescaling behaviour.
+ */
+public enum RescalingBehaviour implements 
BiConsumerWithException {
+   // rescaling is only executed if the operator can be set to the given 
parallelism
+   STRICT {
+   @Override
+   public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) throws FlinkException {
+   if (jobVertex.getMaxParallelism() < newParallelism) {
+   throw new FlinkException("Cannot rescale vertex 
" + jobVertex.getName() +
+   " because its maximum parallelism " + 
jobVertex.getMaxParallelism() +
+   " is smaller than the new parallelism " 
+ newParallelism + '.');
+   } else {
+   jobVertex.setParallelism(newParallelism);
+   }
+   }
+   },
+   // the new parallelism will be the minimum of the given parallelism and 
the maximum parallelism
+   RELAXED {
--- End diff --

Yes


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169922284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescaleParallelismQueryParameter extends 
MessageQueryParameter {
+
+   public static final String KEY = "parallelism";
+
+   protected RescaleParallelismQueryParameter() {
+   super(KEY, MessageParameterRequisiteness.MANDATORY);
+   }
+
+   @Override
+   public Integer convertValueFromString(String value) {
+   return Integer.valueOf(value);
--- End diff --

Class was renamed to `RescalingParallelismQueryParameter`


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169916387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
--- End diff --

Why is it ok to use a tmp directory as the target directory? Shouldn't the 
directory be visible from all hosts?


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169634082
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param  type of the first argument
+ * @param  type of the second argument
+ * @param  type of the thrown exception
+ */
+@FunctionalInterface
+public interface BiConsumerWithException 
extends BiConsumer {
+
+   /**
+* Performs this operation on the given arguments.
+*
+* @param t the first input argument
+* @param u the second input argument
+* @throws E in case of an error
+*/
+   void acceptWithException(T t, U u) throws E;
+
+   @Override
+   default void accept(T t, U u) {
+   try {
+   acceptWithException(t, u);
+   } catch (Throwable e) {
--- End diff --

Normally `Error`s shouldn't be caught 
https://stackoverflow.com/questions/581878/why-catch-exceptions-in-java-when-you-can-catch-throwables


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169919835
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescaleParallelismQueryParameter extends 
MessageQueryParameter {
+
+   public static final String KEY = "parallelism";
+
+   protected RescaleParallelismQueryParameter() {
+   super(KEY, MessageParameterRequisiteness.MANDATORY);
+   }
+
+   @Override
+   public Integer convertValueFromString(String value) {
+   return Integer.valueOf(value);
--- End diff --

We could already validate that the integer is greater than 0. 
Also there could be validation in `Dispatcher#rescaleJob` and 
`JobMaster#rescaleJob`


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169918052
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
+   timeout);
+
+   final CompletableFuture executionGraphFuture = 
savepointFuture
+   .thenApplyAsync(
+   (String savepointPath) -> {
+   try {
+   
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+   savepointPath,
+   false,
+   
newExecutionGraph.getAllVertices(),
+   userCodeLoader);
+   } catch (Exception e) {

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169918959
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer;
+import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer;
+import org.apache.flink.util.SerializedThrowable;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+/**
+ * Basic information object for asynchronous operations.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class AsynchronousOperationInfo {
+
+   private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+
+   @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+   @JsonSerialize(using = SerializedThrowableSerializer.class)
+   @Nullable
+   private final SerializedThrowable failureCause;
+
+   private AsynchronousOperationInfo(
+   @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
--- End diff --

nit: indentation


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169913228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception 
cause) {
return Acknowledge.get();
}
 
+   /**
+* Schedules the execution of the current {@link ExecutionGraph}.
+*/
+   private void scheduleExecutionGraph() {
+   try {
+   executionGraph.scheduleForExecution();
+   }
+   catch (Throwable t) {
+   executionGraph.failGlobal(t);
+   }
+   }
+
+   /**
+* Dispose the savepoint stored under the given path.
+*
+* @param savepointPath path where the savepoint is stored
+*/
+   private void disposeSavepoint(String savepointPath) {
+   try {
+   // delete the temporary savepoint
+   Checkpoints.disposeSavepoint(
+   savepointPath,
+   jobMasterConfiguration.getConfiguration(),
+   userCodeLoader,
+   log);
+   } catch (FlinkException | IOException de) {
--- End diff --

Why `de`? Just want to make sure it's not a typo.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169912256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
--- End diff --

nit: *configured*


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169911850
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
--- End diff --

Does it have to be part of the `JobMasterGateway` interface? It's only used 
from `rescaleJob`.


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169920801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message headers for the {@link 
RescalingHandlers.RescalingStatusHandler}.
+ */
+public class RescalingStatusMessageParameters extends JobMessageParameters 
{
+
+   public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+   @Override
+   public Collection getPathParameters() {
+   return Arrays.asList(jobPathParameter, triggerIdPathParameter);
--- End diff --

It's not very important but the `List` returned by `Arrays.asList` is 
mutable:

```
public static void main(String[] args) {
final List strings = Arrays.asList("0", "1", "2");
strings.set(2, "3");

System.out.println(strings);
System.out.flush();

final List strings2 = 
Collections.unmodifiableList(strings);
strings2.set(2, "2");
}
```

output
```
[0, 1, 3]
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableList.set(Collections.java:1311)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.main(Dispatcher.java:347)
```


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169632755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+/**
+ * Definition of the rescaling behaviour.
+ */
+public enum RescalingBehaviour implements 
BiConsumerWithException {
+   // rescaling is only executed if the operator can be set to the given 
parallelism
+   STRICT {
+   @Override
+   public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) throws FlinkException {
+   if (jobVertex.getMaxParallelism() < newParallelism) {
+   throw new FlinkException("Cannot rescale vertex 
" + jobVertex.getName() +
+   " because its maximum parallelism " + 
jobVertex.getMaxParallelism() +
+   " is smaller than the new parallelism " 
+ newParallelism + '.');
+   } else {
+   jobVertex.setParallelism(newParallelism);
+   }
+   }
+   },
+   // the new parallelism will be the minimum of the given parallelism and 
the maximum parallelism
+   RELAXED {
--- End diff --

Are you going to make this configurable in a follow up?


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169917224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
+   timeout);
+
+   final CompletableFuture executionGraphFuture = 
savepointFuture
+   .thenApplyAsync(
+   (String savepointPath) -> {
+   try {
+   
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+   savepointPath,
+   false,
+   
newExecutionGraph.getAllVertices(),
+   userCodeLoader);
+   } catch (Exception e) {

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5487#discussion_r169917845
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture rescaleJob(
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   final ArrayList allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+   for (JobVertex jobVertex : jobGraph.getVertices()) {
+   allOperators.add(jobVertex.getID());
+   }
+
+   return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+   }
+
+   @Override
+   public CompletableFuture rescaleOperators(
+   Collection operators,
+   int newParallelism,
+   RescalingBehaviour rescalingBehaviour,
+   Time timeout) {
+   // 1. Check whether we can rescale the job & rescale the 
respective vertices
+   for (JobVertexID jobVertexId : operators) {
+   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+   // update max parallelism in case that it has not been 
configure
+   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   if (executionJobVertex != null) {
+   
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+   }
+
+   try {
+   
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+   } catch (FlinkException e) {
+   final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+   log.info(msg, e);
+
+   return FutureUtils.completedExceptionally(
+   new JobModificationException(msg, e));
+   }
+   }
+
+   final ExecutionGraph currentExecutionGraph = executionGraph;
+
+   final ExecutionGraph newExecutionGraph;
+
+   try {
+   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   jobMasterConfiguration.getConfiguration(),
+   scheduledExecutorService,
+   scheduledExecutorService,
+   slotPool.getSlotProvider(),
+   userCodeLoader,
+   
highAvailabilityServices.getCheckpointRecoveryFactory(),
+   rpcTimeout,
+   currentExecutionGraph.getRestartStrategy(),
+   jobMetricGroup,
+   1,
+   blobServer,
+   jobMasterConfiguration.getSlotRequestTimeout(),
+   log);
+   } catch (JobExecutionException | JobException e) {
+   return FutureUtils.completedExceptionally(
+   new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+   }
+
+   // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkpointCoordinator.stopCheckpointScheduler();
+
+   // 4. take a savepoint
+   final CompletableFuture savepointFuture = 
triggerSavepoint(
+   jobMasterConfiguration.getTmpDirectory(),
+   timeout);
+
+   final CompletableFuture executionGraphFuture = 
savepointFuture
+   .thenApplyAsync(
+   (String savepointPath) -> {
+   try {
+   
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+   savepointPath,
+   false,
+   
newExecutionGraph.getAllVertices(),
+   userCodeLoader);
+   } catch (Exception e) {

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-14 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5487

[FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs

## What is the purpose of the change

Jobs can now be rescaled by calling flink modify  -p .
Internally, the CliFrontend will send the corresponding REST call and poll
for status updates.

This PR is based on #5454.

## Brief change log

- Add `modify` call to `CliFrontend`
- Add `ClusterClient#rescaleJob` method with default implementation
- Implement `RestClusterClient#rescalJob` method to trigger asynchronous 
rescale operation via REST and poll for its status updates

## Verifying this change

- Tested manually
- Added `CliFrontendModifyTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs + stdout help)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rescaleCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5487


commit d9159228091cae9ebbd1bb718b69e6cf452881e1
Author: Till Rohrmann 
Date:   2018-02-13T11:41:44Z

[FLINK-8643] [flip6] Use JobManagerOptions#SLOT_REQUEST_TIMEOUT in 
ExecutionGraph

This commit changes the initialization of the ExecutionGraph to use the
JobManagerOptions#SLOT_REQUEST_TIMEOUT for the slot allocation. Furthermore,
it changes the behaviour of the SlotPool#ProviderAndOwner implementation 
such
that the timeout is given to it via the SlotProvider#allocateSlot call.

commit 19780c9d284914ec51e92231536315299a3c2da3
Author: Till Rohrmann 
Date:   2018-02-13T12:18:01Z

[hotfix] [flip6] Remove unnecessary timeout from SlotPool

commit 9924776c92a378cef144c0767f1ff18b799d52e9
Author: Till Rohrmann 
Date:   2018-02-13T14:33:11Z

[FLINK-8647] [flip6] Introduce JobMasterConfiguration

This commit introduces a JobMasterConfiguration which contains JobMaster 
specific
configuration settings.

commit fde75841de2e27cb7380f3a28066a99e2c1a690d
Author: zentol 
Date:   2018-01-23T12:50:32Z

[FLINK-8475][config][docs] Integrate HA-ZK options

This closes #5462.

commit 788a17fdbd4aaf3429ead4491ede197fc775b1f0
Author: zentol 
Date:   2018-01-23T13:04:36Z

[FLINK-8475][config][docs] Integrate YARN options

This closes #5463.

commit fcd783358c282e61bf12e0c18298c237c85a6695
Author: Till Rohrmann 
Date:   2018-02-13T15:08:38Z

[hotfix] [tests] Simplify JobMasterTest

commit 8206e6f13809c0b60bfaf776bc386088f535e723
Author: Till Rohrmann 
Date:   2018-02-13T15:10:09Z

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.

commit 057a95b7328b1cca7b78bf1dd25e8d048df70410
Author: Till Rohrmann 
Date:   2018-02-13T15:11:37Z

[hotfix] Fix checkstyle violations in ExecutionGraph

commit 9930b0991320bcff268ca82db6378df8976560dc
Author: Till Rohrmann 
Date:   2018-02-13T15:12:41Z

[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph

The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended 
but its
clean up has not been done yet. Only after all Executions have been 
canceled, the
ExecutionGraph will enter the SUSPENDED state and complete the termination 
future
accordingly.

commit 6c51ad306c90464572353168ecafdb962794747e
Author: Till Rohrmann 
Date:   2018-02-13T15:14:41Z

[FLINK-8629] [flip6] Allow JobMaster to rescale jobs

This commit adds the functionality to rescale a job or parts of it to
the JobMaster. In order to rescale a job, the JobMaster does the following:
1. Take a