[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5220 ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160722901 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java --- @@ -99,6 +69,44 @@ public void testStop() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) --- End diff -- True. I'm, however, not a super fan of testing for exception messages. If the message is changed then you don't see it directly. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160721955 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java --- @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client; - -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; - -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the RUN command. - */ -public class CliFrontendRunTest { --- End diff -- Sorry for that. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160721122 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java --- @@ -99,6 +69,44 @@ public void testStop() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testMissingJobId() throws Exception { + // test missing job id + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test + public void testUnknownJobId() throws Exception { + // test unknown job Id + JobID jid = new JobID(); + + String[] parameters = { jid.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); + + try { + testFrontend.stop(parameters); + fail("Should have failed."); + } catch (IllegalArgumentException ignored) { + // expected + } + + Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); --- End diff -- True. Will adapt it accordingly. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160720306 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -882,42 +784,28 @@ protected ClusterClient retrieveClient(CommandLineOptions options) { } } - /** -* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved -* from the provided {@link CommandLineOptions}. -* -* @param options CommandLineOptions specifying the JobManager URL -* @return Gateway to the JobManager -* @throws Exception -*/ - protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { - logAndSysout("Retrieving JobManager."); - return retrieveClient(options).getJobManagerGateway(); - } - /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. -* @param options Command line options +* @param customCommandLine custom command line to use to retrieve the client +* @param commandLine command line to use * @param program The program for which to create the client. * @throws Exception */ protected ClusterClient createClient( - CommandLineOptions options, + CustomCommandLine customCommandLine, --- End diff -- Was probably not necessary. Since this method will be removed anyway in a subsequent PR, I'll keep it as it is. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160719791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -598,189 +535,154 @@ protected int cancel(String[] args) { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); targetDirectory = null; } catch (Exception e) { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments: " + e.getMessage()); } } else { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments."); } + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); - } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); - } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + } else { + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); + } - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } - catch (Throwable t) { - return handleError(t); - } } /** * Executes the SAVEPOINT action. * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) { + protected int savepoint(String[] args) throws CliArgsException { LOG.info("Running 'savepoint' command."); - SavepointOptions options; - try { - options = CliFrontendParser.parseSavepointCommand(args); - } catch (CliArgsException e) { - return handleArgException(e); - } catch (Throwable t) { - return handleError(t); - } + SavepointOptions options =
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160715198 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -587,9 +526,7 @@ protected int cancel(String[] args) { try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); } catch (Exception e) { - LOG.error("Error: The value for the Job ID is not a valid ID."); - System.out.println("Error: The value for the Job ID is not a valid ID."); - return 1; + throw new CliArgsException("The value for the JobID is not a valid ID: " + e.getMessage()); --- End diff -- Will change it. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160711940 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -515,33 +464,28 @@ protected int stop(String[] args) { if (stopArgs.length > 0) { String jobIdString = stopArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } - catch (Exception e) { - return handleError(e); - } + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- Good point. Will throw a more meaningful exception. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160710138 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -292,20 +267,14 @@ protected int run(String[] args) { * * @param args Command line arguments for the info action. */ - protected int info(String[] args) { + protected int info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { LOG.info("Running 'info' command."); - // Parse command line options - InfoOptions options; - try { - options = CliFrontendParser.parseInfoCommand(args); --- End diff -- Will remove it. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160709495 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -274,19 +251,13 @@ protected int run(String[] args) { return executeProgram(program, client, userParallelism); } - catch (Throwable t) { - return handleError(t); - } finally { - if (client != null) { - try { - client.shutdown(); - } catch (Exception e) { - LOG.warn("Could not properly shut down the cluster client.", e); - } - } - if (program != null) { - program.deleteExtractedLibraries(); + program.deleteExtractedLibraries(); --- End diff -- Good catch. Will change it. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159446300 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java --- @@ -99,6 +69,44 @@ public void testStop() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) --- End diff -- nit: Test could be stricter by asserting that the exception carries the expected message. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159447901 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java --- @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client; - -import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.Flip6DefaultCLI; -import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; - -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the RUN command. - */ -public class CliFrontendRunTest { --- End diff -- File is not recognized as *moved* by git ð¢ ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159449988 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java --- @@ -42,70 +39,54 @@ /** * Tests for the CANCEL command. */ -public class CliFrontendCancelTest { +public class CliFrontendCancelTest extends TestLogger { @BeforeClass public static void init() { CliFrontendTestUtils.pipeSystemOutToNull(); } @Test - public void testCancel() { - try { - // test unrecognized option - { - String[] parameters = {"-v", "-l"}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } - - // test missing job id - { - String[] parameters = {}; - CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode != 0); - } + public void testCancel() throws Exception { - // test cancel properly - { - JobID jid = new JobID(); + // test cancel properly + { + JobID jid = new JobID(); - String[] parameters = { jid.toString() }; - CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); + String[] parameters = { jid.toString() }; + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); - int retCode = testFrontend.cancel(parameters); - assertTrue(retCode == 0); + int retCode = testFrontend.cancel(parameters); + assertTrue(retCode == 0); --- End diff -- nit: Code is copied but it's better to use `assertEquals` for proper failure reasons. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159432084 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -598,189 +535,154 @@ protected int cancel(String[] args) { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); targetDirectory = null; } catch (Exception e) { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments: " + e.getMessage()); } } else { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments."); } + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); - } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); - } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + } else { + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); + } - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } - catch (Throwable t) { - return handleError(t); - } } /** * Executes the SAVEPOINT action. * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) { + protected int savepoint(String[] args) throws CliArgsException { LOG.info("Running 'savepoint' command."); - SavepointOptions options; - try { - options = CliFrontendParser.parseSavepointCommand(args); - } catch (CliArgsException e) { - return handleArgException(e); - } catch (Throwable t) { - return handleError(t); - } + SavepointOptions options = CliFrontendParser.parseSavepointCommand(args);
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159439656 --- Diff: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java --- @@ -99,6 +69,44 @@ public void testStop() throws Exception { } } + @Test(expected = CliArgsException.class) + public void testUnrecognizedOption() throws Exception { + // test unrecognized option + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test(expected = CliArgsException.class) + public void testMissingJobId() throws Exception { + // test missing job id + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + testFrontend.stop(parameters); + + fail("Should have failed."); + } + + @Test + public void testUnknownJobId() throws Exception { + // test unknown job Id + JobID jid = new JobID(); + + String[] parameters = { jid.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); + + try { + testFrontend.stop(parameters); + fail("Should have failed."); + } catch (IllegalArgumentException ignored) { + // expected + } + + Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); --- End diff -- You are already mocking the behavior of the `client` with: ``` doThrow(new IllegalArgumentException("Test exception")).when(client).stop(any(JobID.class)); ``` Therefore the test should fail in the absence of the exception, i.e., if `stop` is not called. I think this `verify` is not needed: ``` @Test(expected = IllegalArgumentException.class) public void testUnknownJobId() throws Exception { // test unknown job Id JobID jid = new JobID(); String[] parameters = {jid.toString()}; StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); testFrontend.stop(parameters); fail("Should have failed."); } ``` ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159435504 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -292,20 +267,14 @@ protected int run(String[] args) { * * @param args Command line arguments for the info action. */ - protected int info(String[] args) { + protected int info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { LOG.info("Running 'info' command."); - // Parse command line options - InfoOptions options; - try { - options = CliFrontendParser.parseInfoCommand(args); --- End diff -- `CliFrontendParser.parseInfoCommand()` is no longer in use. Can be deleted. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159422039 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -882,42 +784,28 @@ protected ClusterClient retrieveClient(CommandLineOptions options) { } } - /** -* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved -* from the provided {@link CommandLineOptions}. -* -* @param options CommandLineOptions specifying the JobManager URL -* @return Gateway to the JobManager -* @throws Exception -*/ - protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { - logAndSysout("Retrieving JobManager."); - return retrieveClient(options).getJobManagerGateway(); - } - /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. -* @param options Command line options +* @param customCommandLine custom command line to use to retrieve the client +* @param commandLine command line to use * @param program The program for which to create the client. * @throws Exception */ protected ClusterClient createClient( - CommandLineOptions options, + CustomCommandLine customCommandLine, --- End diff -- Was it really necessary to change the signature? It seems that the behavior is otherwise the same. Why not leave the line ``` CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); ``` inside this method. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159425958 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -274,19 +251,13 @@ protected int run(String[] args) { return executeProgram(program, client, userParallelism); } - catch (Throwable t) { - return handleError(t); - } finally { - if (client != null) { - try { - client.shutdown(); - } catch (Exception e) { - LOG.warn("Could not properly shut down the cluster client.", e); - } - } - if (program != null) { - program.deleteExtractedLibraries(); + program.deleteExtractedLibraries(); --- End diff -- This won't run if `createClient()` fails. Before the client creation was inside the `try` block. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159431758 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -598,189 +535,154 @@ protected int cancel(String[] args) { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); targetDirectory = null; } catch (Exception e) { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments: " + e.getMessage()); } } else { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments."); } + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); - } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); - } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + } else { + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); + } - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } - catch (Throwable t) { - return handleError(t); - } } /** * Executes the SAVEPOINT action. * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) { + protected int savepoint(String[] args) throws CliArgsException { LOG.info("Running 'savepoint' command."); - SavepointOptions options; - try { - options = CliFrontendParser.parseSavepointCommand(args); - } catch (CliArgsException e) { - return handleArgException(e); - } catch (Throwable t) { - return handleError(t); - } + SavepointOptions options = CliFrontendParser.parseSavepointCommand(args);
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159430427 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -587,9 +526,7 @@ protected int cancel(String[] args) { try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); } catch (Exception e) { - LOG.error("Error: The value for the Job ID is not a valid ID."); - System.out.println("Error: The value for the Job ID is not a valid ID."); - return 1; + throw new CliArgsException("The value for the JobID is not a valid ID: " + e.getMessage()); --- End diff -- Maybe log the input value as well ``` try { new JobID(StringUtils.hexStringToByte("dea84ec368d886f1638c01447ec6951")); } catch (Exception e) { throw new CliArgsException("The value for the JobID is not a valid ID: " + e.getMessage()); } ``` message is: ``` The value for the JobID is not a valid ID: Argument bytes must by an array of 16 bytes ``` Also, this piece of code is duplicated. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159429333 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -515,33 +464,28 @@ protected int stop(String[] args) { if (stopArgs.length > 0) { String jobIdString = stopArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } - catch (Exception e) { - return handleError(e); - } + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- The exception's message won't be super meaningful: ``` new JobID(StringUtils.hexStringToByte("dupa")); ``` ``` java.lang.NumberFormatException: For input string: "du" ``` ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r159422003 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -882,42 +784,28 @@ protected ClusterClient retrieveClient(CommandLineOptions options) { } } - /** -* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved -* from the provided {@link CommandLineOptions}. -* -* @param options CommandLineOptions specifying the JobManager URL -* @return Gateway to the JobManager -* @throws Exception -*/ - protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { - logAndSysout("Retrieving JobManager."); - return retrieveClient(options).getJobManagerGateway(); - } - /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. -* @param options Command line options +* @param customCommandLine custom command line to use to retrieve the client +* @param commandLine command line to use * @param program The program for which to create the client. * @throws Exception */ protected ClusterClient createClient( - CommandLineOptions options, + CustomCommandLine customCommandLine, --- End diff -- Was it really necessary to change the signature? It seems that the behavior is otherwise the same. Why not leave the line ``` CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); ``` inside this method. ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5220 [FLINK-8333] [flip6] Separate deployment options from command options ## What is the purpose of the change This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. This PR is based on #5219. ## Brief change log - Move `CliFrontend` to `org.apache.flink.client.cli` - Decouple creation of `RunOptions`, `ListOptions`, etc. from `CliFrontendParser` ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink splitCliOptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5220.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5220 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7 Author: Till Rohrmann Date: 2017-12-20T16:19:59Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. ---