[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320773696
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
 ##
 @@ -178,65 +139,65 @@ public boolean isRunning() {
 */
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
-   if (plan == null) {
-   throw new IllegalArgumentException("The plan may not be 
null.");
-   }
+   checkNotNull(plan);
 
-   synchronized (this.lock) {
+   final Configuration jobExecutorServiceConfiguration = 
configureExecution(plan);
 
-   // check if we start a session dedicated for this 
execution
-   final boolean shutDownAtEnd;
+   try (final JobExecutorService executorService = 
createJobExecutorService(jobExecutorServiceConfiguration)) {
 
-   if (jobExecutorService == null) {
-   shutDownAtEnd = true;
+   Optimizer pc = new Optimizer(new DataStatistics(), 
jobExecutorServiceConfiguration);
+   OptimizedPlan op = pc.compile(plan);
 
-   // configure the number of local slots equal to 
the parallelism of the local plan
-   if (this.taskManagerNumSlots == 
DEFAULT_TASK_MANAGER_NUM_SLOTS) {
-   int maxParallelism = 
plan.getMaximumParallelism();
-   if (maxParallelism > 0) {
-   this.taskManagerNumSlots = 
maxParallelism;
-   }
-   }
+   JobGraphGenerator jgg = new 
JobGraphGenerator(jobExecutorServiceConfiguration);
+   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
 
-   // start the cluster for us
-   start();
-   }
-   else {
-   // we use the existing session
-   shutDownAtEnd = false;
+   return executorService.executeJobBlocking(jobGraph);
+   }
+   }
+
+   private Configuration configureExecution(final Plan plan) {
+   setNumberOfTaskSlots(plan);
+   final Configuration executorConfiguration = 
createExecutorServiceConfig();
+   setPlanParallelism(plan, executorConfiguration);
+   return executorConfiguration;
+   }
+
+   private void setNumberOfTaskSlots(final Plan plan) {
+   if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) 
{
+   int maxParallelism = plan.getMaximumParallelism();
+   if (maxParallelism > 0) {
+   this.taskManagerNumSlots = maxParallelism;
}
+   }
+   }
 
-   try {
-   // TODO: Set job's default parallelism to max 
number of slots
-   final int slotsPerTaskManager = 
jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
-   final int numTaskManagers = 
jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
 1);
-   plan.setDefaultParallelism(slotsPerTaskManager 
* numTaskManagers);
+   private Configuration createExecutorServiceConfig() {
+   final Configuration newConfiguration = new Configuration();
+   newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
+   
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, 
defaultOverwriteFiles);
 
 Review comment:
   I only see usage of this setter in one single test, maybe we can also remove 
that or at least not use a field for this anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320775256
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 ##
 @@ -72,15 +71,9 @@ public String getExecutionPlan() throws Exception {
return gen.getOptimizerPlanAsJSON(op);
}
 
-   @Override
-   public void startNewSession() throws Exception {
-   jobID = JobID.generate();
-   }
-
@Override
public String toString() {
-   return "Context Environment (parallelism = " + 
(getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : 
getParallelism())
-   + ") : " + getIdString();
+   return "Context Environment (parallelism = " + 
(getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : 
getParallelism()) + ").";
 
 Review comment:
   I wouldn't put a `.` here, I don't think it's a sentence


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320774343
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
 ##
 @@ -178,65 +139,65 @@ public boolean isRunning() {
 */
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
-   if (plan == null) {
-   throw new IllegalArgumentException("The plan may not be 
null.");
-   }
+   checkNotNull(plan);
 
-   synchronized (this.lock) {
+   final Configuration jobExecutorServiceConfiguration = 
configureExecution(plan);
 
-   // check if we start a session dedicated for this 
execution
-   final boolean shutDownAtEnd;
+   try (final JobExecutorService executorService = 
createJobExecutorService(jobExecutorServiceConfiguration)) {
 
-   if (jobExecutorService == null) {
-   shutDownAtEnd = true;
+   Optimizer pc = new Optimizer(new DataStatistics(), 
jobExecutorServiceConfiguration);
+   OptimizedPlan op = pc.compile(plan);
 
-   // configure the number of local slots equal to 
the parallelism of the local plan
-   if (this.taskManagerNumSlots == 
DEFAULT_TASK_MANAGER_NUM_SLOTS) {
-   int maxParallelism = 
plan.getMaximumParallelism();
-   if (maxParallelism > 0) {
-   this.taskManagerNumSlots = 
maxParallelism;
-   }
-   }
+   JobGraphGenerator jgg = new 
JobGraphGenerator(jobExecutorServiceConfiguration);
+   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
 
-   // start the cluster for us
-   start();
-   }
-   else {
-   // we use the existing session
-   shutDownAtEnd = false;
+   return executorService.executeJobBlocking(jobGraph);
+   }
+   }
+
+   private Configuration configureExecution(final Plan plan) {
+   setNumberOfTaskSlots(plan);
+   final Configuration executorConfiguration = 
createExecutorServiceConfig();
+   setPlanParallelism(plan, executorConfiguration);
+   return executorConfiguration;
+   }
+
+   private void setNumberOfTaskSlots(final Plan plan) {
+   if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) 
{
+   int maxParallelism = plan.getMaximumParallelism();
+   if (maxParallelism > 0) {
+   this.taskManagerNumSlots = maxParallelism;
}
+   }
+   }
 
-   try {
-   // TODO: Set job's default parallelism to max 
number of slots
-   final int slotsPerTaskManager = 
jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
-   final int numTaskManagers = 
jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
 1);
-   plan.setDefaultParallelism(slotsPerTaskManager 
* numTaskManagers);
+   private Configuration createExecutorServiceConfig() {
+   final Configuration newConfiguration = new Configuration();
+   newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
+   
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, 
defaultOverwriteFiles);
 
-   Optimizer pc = new Optimizer(new 
DataStatistics(), jobExecutorServiceConfiguration);
-   OptimizedPlan op = pc.compile(plan);
+   newConfiguration.addAll(baseConfiguration);
 
-   JobGraphGenerator jgg = new 
JobGraphGenerator(jobExecutorServiceConfiguration);
-   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
+   return newConfiguration;
+   }
 
-   return 
jobExecutorService.executeJobBlocking(jobGraph);
-   }
-   finally {
-   if (shutDownAtEnd) {
-   stop();
-   }
-   }
-   }
+   private void setPlanParallelism(final Plan plan, final Configuration 

[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320774791
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
 ##
 @@ -32,13 +32,14 @@
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.util.NetUtils;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 
 Review comment:
   Nice simplifications in this file.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320773446
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
 ##
 @@ -178,65 +139,65 @@ public boolean isRunning() {
 */
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
-   if (plan == null) {
-   throw new IllegalArgumentException("The plan may not be 
null.");
-   }
+   checkNotNull(plan);
 
-   synchronized (this.lock) {
+   final Configuration jobExecutorServiceConfiguration = 
configureExecution(plan);
 
-   // check if we start a session dedicated for this 
execution
-   final boolean shutDownAtEnd;
+   try (final JobExecutorService executorService = 
createJobExecutorService(jobExecutorServiceConfiguration)) {
 
-   if (jobExecutorService == null) {
-   shutDownAtEnd = true;
+   Optimizer pc = new Optimizer(new DataStatistics(), 
jobExecutorServiceConfiguration);
+   OptimizedPlan op = pc.compile(plan);
 
-   // configure the number of local slots equal to 
the parallelism of the local plan
-   if (this.taskManagerNumSlots == 
DEFAULT_TASK_MANAGER_NUM_SLOTS) {
-   int maxParallelism = 
plan.getMaximumParallelism();
-   if (maxParallelism > 0) {
-   this.taskManagerNumSlots = 
maxParallelism;
-   }
-   }
+   JobGraphGenerator jgg = new 
JobGraphGenerator(jobExecutorServiceConfiguration);
+   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
 
-   // start the cluster for us
-   start();
-   }
-   else {
-   // we use the existing session
-   shutDownAtEnd = false;
+   return executorService.executeJobBlocking(jobGraph);
+   }
+   }
+
+   private Configuration configureExecution(final Plan plan) {
+   setNumberOfTaskSlots(plan);
+   final Configuration executorConfiguration = 
createExecutorServiceConfig();
+   setPlanParallelism(plan, executorConfiguration);
+   return executorConfiguration;
+   }
+
+   private void setNumberOfTaskSlots(final Plan plan) {
 
 Review comment:
   This seems superfluous. We set `taskManagerNumSlots` only to use it in 
`createExecutorServiceConfig` to set the configuration. I think this can be 
replaced by a local variable and an argument to `createExecutorServiceConfig()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment

2019-09-04 Thread GitBox
aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320769875
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
 ##
 @@ -77,11 +70,11 @@
// 

 
public LocalExecutor() {
-   this(null);
+   this(new Configuration());
}
 
public LocalExecutor(Configuration conf) {
-   this.baseConfiguration = conf != null ? conf : new 
Configuration();
+   this.baseConfiguration = checkNotNull(conf);
 
 Review comment:
   I think you can only omit the `checkNotNull()` calls when your build system 
verifies that the `@Nonnull` contract is actually followed. Outside code, that 
is calling your function might still pass in a `null`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services