[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359662#comment-16359662
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4510


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337902#comment-16337902
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4510
  
@tzulitai is anything left to do here?


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156061#comment-16156061
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137395149
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 

[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154978#comment-16154978
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137199417
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
--- End diff --

Ok makes sense. I'll include that here.


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154971#comment-16154971
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137197996
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 

[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154969#comment-16154969
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137197610
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
--- End diff --

I think this can be included in this PR because that's what the scope of 
the issue is about.


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154965#comment-16154965
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137196450
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
--- End diff --

AFAIK, the only limitation causing this is how the parallelism is verified 
in `ExecutionGraphBuilder.buildGraph`. Though I'm not sure of why we needed 
that verification in the first place, it should be fairly simple to remove this 
limitation. Was planning to do that as a separate PR, while using this PR to 
first get some consensus on this.


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154961#comment-16154961
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137196004
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 

[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153755#comment-16153755
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137005950
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 

[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153754#comment-16153754
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137005513
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
--- End diff --

Why is this the case? I think we should remove this limitation as stated by 
your TODO.


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121120#comment-16121120
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4510
  
R: @tillrohrmann 


> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121116#comment-16121116
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4510

[FLINK-7124] [flip-6] Add test to verify rescaling JobGraphs works correctly

## What is the purpose of the change

This pull request adds test to verify that rescaling `JobGraph`s to 
arbitrary valid DOPs works correctly, such that the generated `ExecutionGraph` 
of each rescale is correct.

## Brief change log

- Add `ExecutionGraphRescalingTest` class.
  - Contains a test that consecutively rescales (up and down) a given 
`JobGraph`.
  - Contains a test that rescales to DOP beyond max parallelism. `TODO` 
ignored for now since the test doesn't properly fail as expected.
- `[hotfix]` refactor `ExecutionGraphConstructionTest` to share utility 
graph validation code.

## Verifying this change

The changes themselves are a verification of existing features. See above.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: **no**

## Documentation

  - Does this pull request introduce a new feature? *no*
  - If yes, how is the feature documented? *not applicable*



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7124

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4510.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4510


commit ce4635f209c91241757662fda9e894557bac298b
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-10T05:41:40Z

[FLINK-7124] [flip-6] Add test to verify rescaling JobGraphs works correctly

This commit adds two tests to verify behaviours of rescaling JobGraphs:
1. JobGraphs can be consecutively rescaled to arbitrary valid DOPs
2. Rescaling beyond max parallelism would fail

The second test, however, is temporarily disabled for now since it
doesn't properly fail.

commit d2ecca2428d1801b4e96233c83216992da13a9bb
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-10T05:57:09Z

[hotfix] Refactor graph verification code in ExecutionGraphConstructionTest

The refactoring resuses utility methods in ExecutionGraphTestUtils to
verify constructed ExecutionGraphs.




> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-07-25 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101083#comment-16101083
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-7124:


[~trohrm...@apache.org]
I was blocked on fixes and testing for Flink 1.3.2 at the moment and haven't 
started with this yet.
My intention is to start with this issue near the end of the week.

> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-07-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100231#comment-16100231
 ] 

Till Rohrmann commented on FLINK-7124:
--

Hi [~tzulitai], what's the state of this issue?

> Allow to rescale JobGraph on JobManager
> ---
>
> Key: FLINK-7124
> URL: https://issues.apache.org/jira/browse/FLINK-7124
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>
> In order to support dynamic scaling, the {{JobManager}} has to be able to 
> change the parallelism of the submitted {{JobGraph}}. This basically entails 
> that we can change the parallelism settings on the cluster side.
> We already have the functionality that we can change the parallelism if it 
> was set to {{ExecutionConfig.PARALLELISM_AUTO_MAX}}. Therefore, I think the 
> task is mostly about making sure that things really properly work when 
> requesting a parallelism change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)