[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323191#comment-16323191
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5225


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322459#comment-16322459
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160995333
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

ok


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322453#comment-16322453
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160994352
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

ok


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322425#comment-16322425
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5225
  
Thanks for the review @GJL. I've addressed most of your comments and 
rebased onto the latest master.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322396#comment-16322396
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160985701
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

You're right. I would like to postpone addressing this issue until we've 
introduce the typed cluster id. Otherwise I fear that it would inflict quite 
some merge conflicts.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322395#comment-16322395
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160985377
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link FlinkYarnSessionCli}.
+ */
+public class FlinkYarnSessionCliTest extends TestLogger {
+
+   private static final ApplicationId TEST_YARN_APPLICATION_ID = 
ApplicationId.newInstance(System.currentTimeMillis(), 42);
+
+   private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = 
ApplicationId.newInstance(System.currentTimeMillis(), 43);
+
+   private static final String TEST_YARN_JOB_MANAGER_ADDRESS = 
"22.33.44.55";
+   private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+
+   private static final String validPropertiesFile = "applicationID=" + 
TEST_YARN_APPLICATION_ID;
+
+   private static final String invalidPropertiesFile = "jasfobManager=" + 
TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT;
+
+   @Rule
+   public TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testDynamicProperties() throws Exception {
+
+   FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+   "",
+   "",
+   false);
+   Options options = new Options();
+   cli.addGeneralOptions(options);
+   cli.addRunOptions(options);
+
+   CommandLineParser parser = new DefaultParser();
+   CommandLine cmd = parser.parse(options, new String[]{"run", 
"-j", "fake.jar", "-n", "15",
+   "-D", "akka.ask.timeout=5 min", "-D", 
"env.java.opts=-DappName=foobar"});
+
+   AbstractYarnClusterDescriptor flinkYarnDescriptor = 
cli.createDescriptor(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   null,
+   cmd);
+
+   Assert.assertNotNull(flinkYarnDescriptor);
+
+   Map dynProperties =
+   
FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
+   assertEquals(2, dynProperties.size());
+   assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
+   assertEquals("-DappName=foobar", 
dynProperties.get("env.java.opts"));
+   }
+
+   @Test
+   public void 

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322353#comment-16322353
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160978701
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link AbstractYarnClusterDescriptor}.
+ */
+public class AbstractYarnClusterTest extends TestLogger {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the cluster retrieval of a finished YARN application 
fails.
+*/
+   @Test(expected = RuntimeException.class)
+   public void testClusterClientRetrievalOfFinishedYarnApplication() 
throws IOException {
+   final ApplicationId applicationId = 
ApplicationId.newInstance(System.currentTimeMillis(), 42);
+   final String clusterId = applicationId.toString();
+   final ApplicationReport applicationReport = 
ApplicationReport.newInstance(
+   applicationId,
+   ApplicationAttemptId.newInstance(applicationId, 0),
+   "user",
+   "queue",
+   "name",
+   "localhost",
+   42,
+   null,
+   YarnApplicationState.FINISHED,
+   null,
+   null,
+   1L,
+   2L,
+   FinalApplicationStatus.SUCCEEDED,
+   null,
+   null,
+   1.0f,
+   null,
+   null);
+
+   final YarnClient yarnClient = new 
TestingYarnClient(Collections.singletonMap(applicationId, applicationReport));
+
+   final TestingAbstractYarnClusterDescriptor clusterDescriptor = 
new TestingAbstractYarnClusterDescriptor(
+   new Configuration(),
+   temporaryFolder.newFolder().getAbsolutePath(),
+   yarnClient);
+
+   clusterDescriptor.retrieve(clusterId);
+
+   fail("We should not be able to retrieve ClusterClient for a 
finished Yarn application.");
+   }
+
+   @Test
+   public void testClusterClientRetrievalFromInvalidYarnId() {
--- End diff --

Forgot about it. Will add it.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: 

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322349#comment-16322349
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160976939
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+
+import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * Base class for {@link CustomCommandLine} implementations which specify 
a JobManager address and
+ * a ZooKeeper namespace.
+ *
+ * @param  type of the ClusterClient which is returned
+ */
+public abstract class AbstractCustomCommandLine 
implements CustomCommandLine {
+
+   protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new 
Option("z", "zookeeperNamespace", true,
+   "Namespace to create the Zookeeper sub-paths for high 
availability mode");
+
+
+   protected static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true,
--- End diff --

True. Will make it an instance field.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322329#comment-16322329
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160975126
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.cli.CommandLine;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the {@link DefaultCLI}.
+ */
+public class DefaultCLITest extends TestLogger {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the configuration is properly passed via the DefaultCLI 
to the
+* created ClusterDescriptor.
+*/
+   @Test
+   public void testConfigurationPassing() throws Exception {
+   final DefaultCLI defaultCLI = new DefaultCLI();
+
+   final String configurationDirectory = 
temporaryFolder.newFolder().getAbsolutePath();
+   final String[] args = {};
+
+   CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
+
+   final String localhost = "localhost";
+   final int port = 1234;
+   final Configuration configuration = new Configuration();
+
+   configuration.setString(JobManagerOptions.ADDRESS, localhost);
+   configuration.setInteger(JobManagerOptions.PORT, port);
+
+   final InetSocketAddress expectedAddress = new 
InetSocketAddress(localhost, port);
+
+   final ClusterDescriptor clusterDescriptor = 
defaultCLI.createClusterDescriptor(
+   configuration,
+   configurationDirectory,
+   commandLine);
+
+   final ClusterClient clusterClient = 
clusterDescriptor.retrieve(defaultCLI.getClusterId(
+   configuration,
+   commandLine));
+
+   Assert.assertEquals(expectedAddress, 
clusterClient.getJobManagerAddress());
+   }
+
+   /**
+* Tests that command line options override the configuration settings.
+*/
+   @Test
+   public void testManualConfigurationOverride() throws Exception {
+   final DefaultCLI defaultCLI = new DefaultCLI();
+
+   final String manualHostname = "123.123.123.123";
+   final int manualPort = 4321;
+   final String configurationDirectory = 
temporaryFolder.newFolder().getAbsolutePath();
+   final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+   CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
+
+   final String localhost = "localhost";
+   final int port = 1234;
+   final Configuration configuration = new Configuration();
+
+   configuration.setString(JobManagerOptions.ADDRESS, localhost);
+   configuration.setInteger(JobManagerOptions.PORT, port);
+
+   final InetSocketAddress expectedAddress = new 
InetSocketAddress(manualHostname, manualPort);
--- End diff --

Will change it.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: 

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322285#comment-16322285
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160972443
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception {
 *
 * @param args Command line arguments for the cancel action.
 */
-   protected int savepoint(String[] args) throws CliArgsException {
+   protected void savepoint(String[] args) throws Exception {
--- End diff --

I forgot to remove the return codes from the other methods. Will change it.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322275#comment-16322275
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160970851
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception {
 *
 * @param args Command line arguments for the cancel action.
--- End diff --

good catch


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322274#comment-16322274
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160970724
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

After thinking again about it, I think it's ok how it is. The unsupported 
operation is `StandaloneClusterDescriptor#deploySessionCluster`. But this has 
nothing to do with the `DefaultCLI#getClusterSpecification`. In the future it 
might very well be that we can also start a standalone cluster programmatically.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322269#comment-16322269
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160970084
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

True. Will throw an `UnsupportedOperationException`.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16316043#comment-16316043
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160110442
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link FlinkYarnSessionCli}.
+ */
+public class FlinkYarnSessionCliTest extends TestLogger {
+
+   private static final ApplicationId TEST_YARN_APPLICATION_ID = 
ApplicationId.newInstance(System.currentTimeMillis(), 42);
+
+   private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = 
ApplicationId.newInstance(System.currentTimeMillis(), 43);
+
+   private static final String TEST_YARN_JOB_MANAGER_ADDRESS = 
"22.33.44.55";
+   private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+
+   private static final String validPropertiesFile = "applicationID=" + 
TEST_YARN_APPLICATION_ID;
+
+   private static final String invalidPropertiesFile = "jasfobManager=" + 
TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT;
+
+   @Rule
+   public TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testDynamicProperties() throws Exception {
+
+   FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+   "",
+   "",
+   false);
+   Options options = new Options();
+   cli.addGeneralOptions(options);
+   cli.addRunOptions(options);
+
+   CommandLineParser parser = new DefaultParser();
+   CommandLine cmd = parser.parse(options, new String[]{"run", 
"-j", "fake.jar", "-n", "15",
+   "-D", "akka.ask.timeout=5 min", "-D", 
"env.java.opts=-DappName=foobar"});
+
+   AbstractYarnClusterDescriptor flinkYarnDescriptor = 
cli.createDescriptor(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   null,
+   cmd);
+
+   Assert.assertNotNull(flinkYarnDescriptor);
+
+   Map dynProperties =
+   
FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
+   assertEquals(2, dynProperties.size());
+   assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
+   assertEquals("-DappName=foobar", 
dynProperties.get("env.java.opts"));
+   }
+
+   @Test
+   public void 

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16316025#comment-16316025
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160108309
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link AbstractYarnClusterDescriptor}.
+ */
+public class AbstractYarnClusterTest extends TestLogger {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the cluster retrieval of a finished YARN application 
fails.
+*/
+   @Test(expected = RuntimeException.class)
+   public void testClusterClientRetrievalOfFinishedYarnApplication() 
throws IOException {
+   final ApplicationId applicationId = 
ApplicationId.newInstance(System.currentTimeMillis(), 42);
+   final String clusterId = applicationId.toString();
+   final ApplicationReport applicationReport = 
ApplicationReport.newInstance(
+   applicationId,
+   ApplicationAttemptId.newInstance(applicationId, 0),
+   "user",
+   "queue",
+   "name",
+   "localhost",
+   42,
+   null,
+   YarnApplicationState.FINISHED,
+   null,
+   null,
+   1L,
+   2L,
+   FinalApplicationStatus.SUCCEEDED,
+   null,
+   null,
+   1.0f,
+   null,
+   null);
+
+   final YarnClient yarnClient = new 
TestingYarnClient(Collections.singletonMap(applicationId, applicationReport));
+
+   final TestingAbstractYarnClusterDescriptor clusterDescriptor = 
new TestingAbstractYarnClusterDescriptor(
+   new Configuration(),
+   temporaryFolder.newFolder().getAbsolutePath(),
+   yarnClient);
+
+   clusterDescriptor.retrieve(clusterId);
+
+   fail("We should not be able to retrieve ClusterClient for a 
finished Yarn application.");
+   }
+
+   @Test
+   public void testClusterClientRetrievalFromInvalidYarnId() {
--- End diff --

This test is empty.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313183#comment-16313183
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159885568
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+
+import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * Base class for {@link CustomCommandLine} implementations which specify 
a JobManager address and
+ * a ZooKeeper namespace.
+ *
+ * @param  type of the ClusterClient which is returned
+ */
+public abstract class AbstractCustomCommandLine 
implements CustomCommandLine {
+
+   protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new 
Option("z", "zookeeperNamespace", true,
+   "Namespace to create the Zookeeper sub-paths for high 
availability mode");
+
+
+   protected static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true,
--- End diff --

Why are the options here constants? In `FlinkYarnSessionCli` the `Option`s 
are instance variables. Also `Option`s are mutable.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311707#comment-16311707
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159707553
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
 ---
@@ -62,12 +63,21 @@ public void addGeneralOptions(Options baseOptions) {
}
 
@Override
-   public T retrieveCluster(CommandLine commandLine, Configuration config, 
String configurationDirectory) throws UnsupportedOperationException {
-   return clusterClient;
+   public ClusterDescriptor createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   return new DummyClusterDescriptor<>(clusterClient);
}
 
@Override
-   public T createCluster(String applicationName, CommandLine commandLine, 
Configuration config, String configurationDirectory, List userJarFiles) 
throws Exception {
-   return clusterClient;
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "dummy";
+   }
+
+   @Override
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

See comment on `DefaultCLI.java`


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311705#comment-16311705
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159707313
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli.util;
+
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Dummy {@link ClusterDescriptor} implementation for testing purposes.
+ *
+ * @param  type of the returned {@link ClusterClient}
+ */
+public class DummyClusterDescriptor implements 
ClusterDescriptor {
+
+   private final C clusterClient;
+
+   public DummyClusterDescriptor(C clusterClient) {
+   this.clusterClient = Preconditions.checkNotNull(clusterClient);
+   }
+
+   @Override
+   public String getClusterDescription() {
+   return null;
+   }
+
+   @Override
+   public C retrieve(String applicationID) throws 
UnsupportedOperationException {
+   return clusterClient;
+   }
+
+   @Override
+   public C deploySessionCluster(ClusterSpecification 
clusterSpecification) throws UnsupportedOperationException {
+   return clusterClient;
+   }
+
+   @Override
+   public C deployJobCluster(ClusterSpecification clusterSpecification, 
JobGraph jobGraph) {
+   return clusterClient;
+   }
+
+   @Override
+   public void close() throws Exception {
--- End diff --

nit: `throws Exception` not needed. Doesn't really matter either at the 
moment.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311706#comment-16311706
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159706464
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.cli.CommandLine;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the {@link DefaultCLI}.
+ */
+public class DefaultCLITest extends TestLogger {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the configuration is properly passed via the DefaultCLI 
to the
+* created ClusterDescriptor.
+*/
+   @Test
+   public void testConfigurationPassing() throws Exception {
+   final DefaultCLI defaultCLI = new DefaultCLI();
+
+   final String configurationDirectory = 
temporaryFolder.newFolder().getAbsolutePath();
+   final String[] args = {};
+
+   CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
+
+   final String localhost = "localhost";
+   final int port = 1234;
+   final Configuration configuration = new Configuration();
+
+   configuration.setString(JobManagerOptions.ADDRESS, localhost);
+   configuration.setInteger(JobManagerOptions.PORT, port);
+
+   final InetSocketAddress expectedAddress = new 
InetSocketAddress(localhost, port);
+
+   final ClusterDescriptor clusterDescriptor = 
defaultCLI.createClusterDescriptor(
+   configuration,
+   configurationDirectory,
+   commandLine);
+
+   final ClusterClient clusterClient = 
clusterDescriptor.retrieve(defaultCLI.getClusterId(
+   configuration,
+   commandLine));
+
+   Assert.assertEquals(expectedAddress, 
clusterClient.getJobManagerAddress());
+   }
+
+   /**
+* Tests that command line options override the configuration settings.
+*/
+   @Test
+   public void testManualConfigurationOverride() throws Exception {
+   final DefaultCLI defaultCLI = new DefaultCLI();
+
+   final String manualHostname = "123.123.123.123";
+   final int manualPort = 4321;
+   final String configurationDirectory = 
temporaryFolder.newFolder().getAbsolutePath();
+   final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+   CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
+
+   final String localhost = "localhost";
+   final int port = 1234;
+   final Configuration configuration = new Configuration();
+
+   configuration.setString(JobManagerOptions.ADDRESS, localhost);
+   configuration.setInteger(JobManagerOptions.PORT, port);
+
+   final InetSocketAddress expectedAddress = new 
InetSocketAddress(manualHostname, manualPort);
--- End diff --

nit: variable is declared too early. Will be used at the `assertEquals`.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> 

[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311704#comment-16311704
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159702950
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception {
 *
 * @param args Command line arguments for the cancel action.
--- End diff --

Javadoc is wrong. This is not the *cancel* action.
  


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311708#comment-16311708
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159703483
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception {
 *
 * @param args Command line arguments for the cancel action.
 */
-   protected int savepoint(String[] args) throws CliArgsException {
+   protected void savepoint(String[] args) throws Exception {
--- End diff --

Why was this changed? It's the only command which doesn't return a status 
code. The diff would have been smaller without it.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311709#comment-16311709
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159706927
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli.util;
+
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Dummy {@link ClusterDescriptor} implementation for testing purposes.
+ *
+ * @param  type of the returned {@link ClusterClient}
+ */
+public class DummyClusterDescriptor implements 
ClusterDescriptor {
+
+   private final C clusterClient;
+
+   public DummyClusterDescriptor(C clusterClient) {
+   this.clusterClient = Preconditions.checkNotNull(clusterClient);
+   }
+
+   @Override
+   public String getClusterDescription() {
+   return null;
--- End diff --

Method is not annotated with `@Nullable` in the `ClusterDescriptor` 
interface.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311650#comment-16311650
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159698673
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java ---
@@ -56,43 +51,30 @@ public String getId() {
return "flip6";
}
 
-   @Override
-   public void addRunOptions(Options baseOptions) {
-   }
-
@Override
public void addGeneralOptions(Options baseOptions) {
+   super.addGeneralOptions(baseOptions);
baseOptions.addOption(FLIP_6);
}
 
@Override
-   public RestClusterClient retrieveCluster(CommandLine commandLine, 
Configuration config, String configurationDirectory) {
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
+   public ClusterDescriptor createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   Flip6StandaloneClusterDescriptor descriptor = new 
Flip6StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   return new 
Flip6StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public RestClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   Flip6StandaloneClusterDescriptor descriptor = new 
Flip6StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "flip6Standalone";
+   }
 
-   return descriptor.deploySessionCluster(clusterSpecification);
+   @Override
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

Same thing here. See comment on `DefaultCLI.java`.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311647#comment-16311647
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159702188
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

It seems that there is a pattern of getting the `clusterDescriptor` and and 
the `client` for all the commands. Also shutting down the client and closing 
the descriptor is repeated. Imo we should extract the pattern to avoid code 
duplication.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311649#comment-16311649
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159698245
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

Can this code actually be executed? It shouldn't be possible to deploy a 
standalone cluster from what I understand:
`StandaloneClusterDescriptor#deploySessionCluster(ClusterSpecification 
clusterSpecification)`
```
@Override
public StandaloneClusterClient 
deploySessionCluster(ClusterSpecification clusterSpecification) throws 
UnsupportedOperationException {
throw new UnsupportedOperationException("Can't deploy a 
standalone cluster.");
}
```

If this shouldn't run, maybe throw an Exception directly.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311648#comment-16311648
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159656514
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java 
---
@@ -59,31 +60,49 @@
void addGeneralOptions(Options baseOptions);
 
/**
-* Retrieves a client for a running cluster.
-* @param commandLine The command-line parameters from the CliFrontend
-* @param config The Flink config
-* @param configurationDirectory Directory for configuration files
-* @return Client if a cluster could be retrieved
-* @throws UnsupportedOperationException if the operation is not 
supported
+* Create a {@link ClusterDescriptor} from the given configuration, 
configuration directory
+* and the command line.
+*
+* @param configuration to create the ClusterDescriptor with
+* @param configurationDirectory where the configuration was loaded from
+* @param commandLine containing command line options relevant for the 
ClusterDescriptor
+* @return ClusterDescriptor
 */
-   ClusterType retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) throws 
UnsupportedOperationException;
+   ClusterDescriptor createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine);
 
/**
-* Creates the client for the cluster.
-* @param applicationName The application name to use
-* @param commandLine The command-line options parsed by the CliFrontend
-* @param config The Flink config to use
-* @param configurationDirectory Directory for configuration files
-*@param userJarFiles User jar files to include in the classpath of the 
cluster.  @return The client to communicate with the cluster which the 
CustomCommandLine brought up.
-* @throws Exception if the cluster could not be created
+* Returns the cluster id if a cluster id was specified on the command 
line, otherwise it
+* returns null.
+*
+* A cluster id identifies a running cluster, e.g. the Yarn 
application id for a Flink
+* cluster running on Yarn.
+*
+* @param configuration to be used for the cluster id retrieval
+* @param commandLine containing command line options relevant for the 
cluster id retrieval
+* @return Cluster id identifying the cluster to deploy jobs to or null
 */
-   ClusterType createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws Exception;
+   @Nullable
+   String getClusterId(Configuration configuration, CommandLine 
commandLine);
+
+   /**
+* Returns the {@link ClusterSpecification} specified by the 
configuration and the command
+* line options. This specification can be used to deploy a new Flink 
cluster.
+*
+* @param configuration to be used for the ClusterSpecification values
+* @param commandLine containing command line options relevant for the 
ClusterSpecification
+* @return ClusterSpecification for a new Flink cluster
+*/
+   ClusterSpecification getClusterSpecification(
+   Configuration configuration,
+   CommandLine commandLine);
+
+   default CommandLine parseCommandLineOptions(String[] args, boolean 
stopAtNonOptions) throws CliArgsException {
--- End diff --

It seems that this is only needed for unit tests.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307771#comment-16307771
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5225

[FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor

## What is the purpose of the change

Instead of directly retrieving or deploying a Flink cluster, the
CustomCommandLine now only returns a ClusterDescriptor which can be used
for these operations. This disentangles the ClusterDescriptor and the
CustomCommandLine a bit better supporting a proper lifecycle management
of the former.

This PR is based on #5224. 

## Brief change log

- Remove indirection that `CustomCommandLines` can retrieve and deploy 
Flink clusters
- Let `CustomCommandLine` return `ClusterDescriptor`
- Adapt command methods in `CliFrontend` to use the `ClusterDescriptor` 
instead

## Verifying this change

- Covered by existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
createClusterDescriptorFactory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5225.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5225


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 192adb786a48d19b71e797355500652d51de6296
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit a73b6e2850d4b2445a835914ba570d1057e59dfb
Author: Till Rohrmann 
Date:   2018-01-02T06:42:18Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

commit e32fa9d4eb45bfac2d13ab112f127be29d273ebd
Author: Till Rohrmann 
Date:   2018-01-02T06:59:34Z

[FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend

This commit changes how CustomCommandLines are registered at the 
CliFrontend.
Henceforth, the CliFrontend is initialized with the set of 
CustomCommandLines
instead of registering them statically. This improves maintainability and
testability.

commit 7972fff6e574f53b057eef6e5326d97f1129af66
Author: Till Rohrmann 
Date:   2018-01-02T08:22:12Z

[FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor

Instead of directly retrieving or deploying a Flink cluster, the
CustomCommandLine now only returns a ClusterDescriptor which can be used
for these operations. This disentangles the ClusterDescriptor and the
CustomCommandLine a bit better supporting a proper lifecycle management
of the former.




> Let CustomCommandLine return a