[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323191#comment-16323191 ] ASF GitHub Bot commented on FLINK-8339: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5225 > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322459#comment-16322459 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160995333 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- ok > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322453#comment-16322453 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160994352 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- ok > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322425#comment-16322425 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5225 Thanks for the review @GJL. I've addressed most of your comments and rebased onto the latest master. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322396#comment-16322396 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160985701 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- You're right. I would like to postpone addressing this issue until we've introduce the typed cluster id. Otherwise I fear that it would inflict quite some merge conflicts. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322395#comment-16322395 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160985377 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link FlinkYarnSessionCli}. + */ +public class FlinkYarnSessionCliTest extends TestLogger { + + private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42); + + private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(), 43); + + private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; + private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; + + private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID; + + private static final String invalidPropertiesFile = "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT; + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testDynamicProperties() throws Exception { + + FlinkYarnSessionCli cli = new FlinkYarnSessionCli( + "", + "", + false); + Options options = new Options(); + cli.addGeneralOptions(options); + cli.addRunOptions(options); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", + "-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"}); + + AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + null, + cmd); + + Assert.assertNotNull(flinkYarnDescriptor); + + MapdynProperties = + FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded()); + assertEquals(2, dynProperties.size()); + assertEquals("5 min", dynProperties.get("akka.ask.timeout")); + assertEquals("-DappName=foobar", dynProperties.get("env.java.opts")); + } + + @Test + public void
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322353#comment-16322353 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160978701 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.fail; + +/** + * Tests for the {@link AbstractYarnClusterDescriptor}. + */ +public class AbstractYarnClusterTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the cluster retrieval of a finished YARN application fails. +*/ + @Test(expected = RuntimeException.class) + public void testClusterClientRetrievalOfFinishedYarnApplication() throws IOException { + final ApplicationId applicationId = ApplicationId.newInstance(System.currentTimeMillis(), 42); + final String clusterId = applicationId.toString(); + final ApplicationReport applicationReport = ApplicationReport.newInstance( + applicationId, + ApplicationAttemptId.newInstance(applicationId, 0), + "user", + "queue", + "name", + "localhost", + 42, + null, + YarnApplicationState.FINISHED, + null, + null, + 1L, + 2L, + FinalApplicationStatus.SUCCEEDED, + null, + null, + 1.0f, + null, + null); + + final YarnClient yarnClient = new TestingYarnClient(Collections.singletonMap(applicationId, applicationReport)); + + final TestingAbstractYarnClusterDescriptor clusterDescriptor = new TestingAbstractYarnClusterDescriptor( + new Configuration(), + temporaryFolder.newFolder().getAbsolutePath(), + yarnClient); + + clusterDescriptor.retrieve(clusterId); + + fail("We should not be able to retrieve ClusterClient for a finished Yarn application."); + } + + @Test + public void testClusterClientRetrievalFromInvalidYarnId() { --- End diff -- Forgot about it. Will add it. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels:
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322349#comment-16322349 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160976939 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.net.InetSocketAddress; + +import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; + +/** + * Base class for {@link CustomCommandLine} implementations which specify a JobManager address and + * a ZooKeeper namespace. + * + * @param type of the ClusterClient which is returned + */ +public abstract class AbstractCustomCommandLine implements CustomCommandLine { + + protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true, + "Namespace to create the Zookeeper sub-paths for high availability mode"); + + + protected static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, --- End diff -- True. Will make it an instance field. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322329#comment-16322329 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160975126 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.cli.CommandLine; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.net.InetSocketAddress; + +/** + * Tests for the {@link DefaultCLI}. + */ +public class DefaultCLITest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the configuration is properly passed via the DefaultCLI to the +* created ClusterDescriptor. +*/ + @Test + public void testConfigurationPassing() throws Exception { + final DefaultCLI defaultCLI = new DefaultCLI(); + + final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); + final String[] args = {}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final String localhost = "localhost"; + final int port = 1234; + final Configuration configuration = new Configuration(); + + configuration.setString(JobManagerOptions.ADDRESS, localhost); + configuration.setInteger(JobManagerOptions.PORT, port); + + final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port); + + final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor( + configuration, + configurationDirectory, + commandLine); + + final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId( + configuration, + commandLine)); + + Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress()); + } + + /** +* Tests that command line options override the configuration settings. +*/ + @Test + public void testManualConfigurationOverride() throws Exception { + final DefaultCLI defaultCLI = new DefaultCLI(); + + final String manualHostname = "123.123.123.123"; + final int manualPort = 4321; + final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); + final String[] args = {"-m", manualHostname + ':' + manualPort}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final String localhost = "localhost"; + final int port = 1234; + final Configuration configuration = new Configuration(); + + configuration.setString(JobManagerOptions.ADDRESS, localhost); + configuration.setInteger(JobManagerOptions.PORT, port); + + final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort); --- End diff -- Will change it. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL:
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322285#comment-16322285 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160972443 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception { * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) throws CliArgsException { + protected void savepoint(String[] args) throws Exception { --- End diff -- I forgot to remove the return codes from the other methods. Will change it. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322275#comment-16322275 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160970851 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception { * * @param args Command line arguments for the cancel action. --- End diff -- good catch > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322274#comment-16322274 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160970724 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- After thinking again about it, I think it's ok how it is. The unsupported operation is `StandaloneClusterDescriptor#deploySessionCluster`. But this has nothing to do with the `DefaultCLI#getClusterSpecification`. In the future it might very well be that we can also start a standalone cluster programmatically. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322269#comment-16322269 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160970084 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- True. Will throw an `UnsupportedOperationException`. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16316043#comment-16316043 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160110442 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link FlinkYarnSessionCli}. + */ +public class FlinkYarnSessionCliTest extends TestLogger { + + private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42); + + private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(), 43); + + private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; + private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; + + private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID; + + private static final String invalidPropertiesFile = "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT; + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testDynamicProperties() throws Exception { + + FlinkYarnSessionCli cli = new FlinkYarnSessionCli( + "", + "", + false); + Options options = new Options(); + cli.addGeneralOptions(options); + cli.addRunOptions(options); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", + "-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"}); + + AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + null, + cmd); + + Assert.assertNotNull(flinkYarnDescriptor); + + MapdynProperties = + FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded()); + assertEquals(2, dynProperties.size()); + assertEquals("5 min", dynProperties.get("akka.ask.timeout")); + assertEquals("-DappName=foobar", dynProperties.get("env.java.opts")); + } + + @Test + public void
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16316025#comment-16316025 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160108309 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.fail; + +/** + * Tests for the {@link AbstractYarnClusterDescriptor}. + */ +public class AbstractYarnClusterTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the cluster retrieval of a finished YARN application fails. +*/ + @Test(expected = RuntimeException.class) + public void testClusterClientRetrievalOfFinishedYarnApplication() throws IOException { + final ApplicationId applicationId = ApplicationId.newInstance(System.currentTimeMillis(), 42); + final String clusterId = applicationId.toString(); + final ApplicationReport applicationReport = ApplicationReport.newInstance( + applicationId, + ApplicationAttemptId.newInstance(applicationId, 0), + "user", + "queue", + "name", + "localhost", + 42, + null, + YarnApplicationState.FINISHED, + null, + null, + 1L, + 2L, + FinalApplicationStatus.SUCCEEDED, + null, + null, + 1.0f, + null, + null); + + final YarnClient yarnClient = new TestingYarnClient(Collections.singletonMap(applicationId, applicationReport)); + + final TestingAbstractYarnClusterDescriptor clusterDescriptor = new TestingAbstractYarnClusterDescriptor( + new Configuration(), + temporaryFolder.newFolder().getAbsolutePath(), + yarnClient); + + clusterDescriptor.retrieve(clusterId); + + fail("We should not be able to retrieve ClusterClient for a finished Yarn application."); + } + + @Test + public void testClusterClientRetrievalFromInvalidYarnId() { --- End diff -- This test is empty. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 >
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313183#comment-16313183 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159885568 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.net.InetSocketAddress; + +import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; + +/** + * Base class for {@link CustomCommandLine} implementations which specify a JobManager address and + * a ZooKeeper namespace. + * + * @param type of the ClusterClient which is returned + */ +public abstract class AbstractCustomCommandLine implements CustomCommandLine { + + protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true, + "Namespace to create the Zookeeper sub-paths for high availability mode"); + + + protected static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, --- End diff -- Why are the options here constants? In `FlinkYarnSessionCli` the `Option`s are instance variables. Also `Option`s are mutable. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311707#comment-16311707 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159707553 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java --- @@ -62,12 +63,21 @@ public void addGeneralOptions(Options baseOptions) { } @Override - public T retrieveCluster(CommandLine commandLine, Configuration config, String configurationDirectory) throws UnsupportedOperationException { - return clusterClient; + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + return new DummyClusterDescriptor<>(clusterClient); } @Override - public T createCluster(String applicationName, CommandLine commandLine, Configuration config, String configurationDirectory, List userJarFiles) throws Exception { - return clusterClient; + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "dummy"; + } + + @Override + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- See comment on `DefaultCLI.java` > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311705#comment-16311705 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159707313 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli.util; + +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.Preconditions; + +/** + * Dummy {@link ClusterDescriptor} implementation for testing purposes. + * + * @param type of the returned {@link ClusterClient} + */ +public class DummyClusterDescriptor implements ClusterDescriptor { + + private final C clusterClient; + + public DummyClusterDescriptor(C clusterClient) { + this.clusterClient = Preconditions.checkNotNull(clusterClient); + } + + @Override + public String getClusterDescription() { + return null; + } + + @Override + public C retrieve(String applicationID) throws UnsupportedOperationException { + return clusterClient; + } + + @Override + public C deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException { + return clusterClient; + } + + @Override + public C deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) { + return clusterClient; + } + + @Override + public void close() throws Exception { --- End diff -- nit: `throws Exception` not needed. Doesn't really matter either at the moment. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311706#comment-16311706 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159706464 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.cli.CommandLine; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.net.InetSocketAddress; + +/** + * Tests for the {@link DefaultCLI}. + */ +public class DefaultCLITest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the configuration is properly passed via the DefaultCLI to the +* created ClusterDescriptor. +*/ + @Test + public void testConfigurationPassing() throws Exception { + final DefaultCLI defaultCLI = new DefaultCLI(); + + final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); + final String[] args = {}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final String localhost = "localhost"; + final int port = 1234; + final Configuration configuration = new Configuration(); + + configuration.setString(JobManagerOptions.ADDRESS, localhost); + configuration.setInteger(JobManagerOptions.PORT, port); + + final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port); + + final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor( + configuration, + configurationDirectory, + commandLine); + + final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId( + configuration, + commandLine)); + + Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress()); + } + + /** +* Tests that command line options override the configuration settings. +*/ + @Test + public void testManualConfigurationOverride() throws Exception { + final DefaultCLI defaultCLI = new DefaultCLI(); + + final String manualHostname = "123.123.123.123"; + final int manualPort = 4321; + final String configurationDirectory = temporaryFolder.newFolder().getAbsolutePath(); + final String[] args = {"-m", manualHostname + ':' + manualPort}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final String localhost = "localhost"; + final int port = 1234; + final Configuration configuration = new Configuration(); + + configuration.setString(JobManagerOptions.ADDRESS, localhost); + configuration.setInteger(JobManagerOptions.PORT, port); + + final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort); --- End diff -- nit: variable is declared too early. Will be used at the `assertEquals`. > Let CustomCommandLine return a ClusterDescriptor > > >
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311704#comment-16311704 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159702950 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception { * * @param args Command line arguments for the cancel action. --- End diff -- Javadoc is wrong. This is not the *cancel* action. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311708#comment-16311708 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159703483 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -561,7 +640,7 @@ protected int cancel(String[] args) throws Exception { * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) throws CliArgsException { + protected void savepoint(String[] args) throws Exception { --- End diff -- Why was this changed? It's the only command which doesn't return a status code. The diff would have been smaller without it. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311709#comment-16311709 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159706927 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli.util; + +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.Preconditions; + +/** + * Dummy {@link ClusterDescriptor} implementation for testing purposes. + * + * @param type of the returned {@link ClusterClient} + */ +public class DummyClusterDescriptor implements ClusterDescriptor { + + private final C clusterClient; + + public DummyClusterDescriptor(C clusterClient) { + this.clusterClient = Preconditions.checkNotNull(clusterClient); + } + + @Override + public String getClusterDescription() { + return null; --- End diff -- Method is not annotated with `@Nullable` in the `ClusterDescriptor` interface. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311650#comment-16311650 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159698673 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java --- @@ -56,43 +51,30 @@ public String getId() { return "flip6"; } - @Override - public void addRunOptions(Options baseOptions) { - } - @Override public void addGeneralOptions(Options baseOptions) { + super.addGeneralOptions(baseOptions); baseOptions.addOption(FLIP_6); } @Override - public RestClusterClient retrieveCluster(CommandLine commandLine, Configuration config, String configurationDirectory) { - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + return new Flip6StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public RestClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "flip6Standalone"; + } - return descriptor.deploySessionCluster(clusterSpecification); + @Override + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- Same thing here. See comment on `DefaultCLI.java`. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311647#comment-16311647 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159702188 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- It seems that there is a pattern of getting the `clusterDescriptor` and and the `client` for all the commands. Also shutting down the client and closing the descriptor is repeated. Imo we should extract the pattern to avoid code duplication. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311649#comment-16311649 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159698245 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- Can this code actually be executed? It shouldn't be possible to deploy a standalone cluster from what I understand: `StandaloneClusterDescriptor#deploySessionCluster(ClusterSpecification clusterSpecification)` ``` @Override public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException { throw new UnsupportedOperationException("Can't deploy a standalone cluster."); } ``` If this shouldn't run, maybe throw an Exception directly. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311648#comment-16311648 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r159656514 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java --- @@ -59,31 +60,49 @@ void addGeneralOptions(Options baseOptions); /** -* Retrieves a client for a running cluster. -* @param commandLine The command-line parameters from the CliFrontend -* @param config The Flink config -* @param configurationDirectory Directory for configuration files -* @return Client if a cluster could be retrieved -* @throws UnsupportedOperationException if the operation is not supported +* Create a {@link ClusterDescriptor} from the given configuration, configuration directory +* and the command line. +* +* @param configuration to create the ClusterDescriptor with +* @param configurationDirectory where the configuration was loaded from +* @param commandLine containing command line options relevant for the ClusterDescriptor +* @return ClusterDescriptor */ - ClusterType retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) throws UnsupportedOperationException; + ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine); /** -* Creates the client for the cluster. -* @param applicationName The application name to use -* @param commandLine The command-line options parsed by the CliFrontend -* @param config The Flink config to use -* @param configurationDirectory Directory for configuration files -*@param userJarFiles User jar files to include in the classpath of the cluster. @return The client to communicate with the cluster which the CustomCommandLine brought up. -* @throws Exception if the cluster could not be created +* Returns the cluster id if a cluster id was specified on the command line, otherwise it +* returns null. +* +* A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink +* cluster running on Yarn. +* +* @param configuration to be used for the cluster id retrieval +* @param commandLine containing command line options relevant for the cluster id retrieval +* @return Cluster id identifying the cluster to deploy jobs to or null */ - ClusterType createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws Exception; + @Nullable + String getClusterId(Configuration configuration, CommandLine commandLine); + + /** +* Returns the {@link ClusterSpecification} specified by the configuration and the command +* line options. This specification can be used to deploy a new Flink cluster. +* +* @param configuration to be used for the ClusterSpecification values +* @param commandLine containing command line options relevant for the ClusterSpecification +* @return ClusterSpecification for a new Flink cluster +*/ + ClusterSpecification getClusterSpecification( + Configuration configuration, + CommandLine commandLine); + + default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException { --- End diff -- It seems that this is only needed for unit tests. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307771#comment-16307771 ] ASF GitHub Bot commented on FLINK-8339: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5225 [FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor ## What is the purpose of the change Instead of directly retrieving or deploying a Flink cluster, the CustomCommandLine now only returns a ClusterDescriptor which can be used for these operations. This disentangles the ClusterDescriptor and the CustomCommandLine a bit better supporting a proper lifecycle management of the former. This PR is based on #5224. ## Brief change log - Remove indirection that `CustomCommandLines` can retrieve and deploy Flink clusters - Let `CustomCommandLine` return `ClusterDescriptor` - Adapt command methods in `CliFrontend` to use the `ClusterDescriptor` instead ## Verifying this change - Covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink createClusterDescriptorFactory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5225.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5225 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 192adb786a48d19b71e797355500652d51de6296 Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit a73b6e2850d4b2445a835914ba570d1057e59dfb Author: Till Rohrmann Date: 2018-01-02T06:42:18Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. commit e32fa9d4eb45bfac2d13ab112f127be29d273ebd Author: Till Rohrmann Date: 2018-01-02T06:59:34Z [FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend This commit changes how CustomCommandLines are registered at the CliFrontend. Henceforth, the CliFrontend is initialized with the set of CustomCommandLines instead of registering them statically. This improves maintainability and testability. commit 7972fff6e574f53b057eef6e5326d97f1129af66 Author: Till Rohrmann Date: 2018-01-02T08:22:12Z [FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor Instead of directly retrieving or deploying a Flink cluster, the CustomCommandLine now only returns a ClusterDescriptor which can be used for these operations. This disentangles the ClusterDescriptor and the CustomCommandLine a bit better supporting a proper lifecycle management of the former. > Let CustomCommandLine return a