[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5220


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160722901
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
 ---
@@ -99,6 +69,44 @@ public void testStop() throws Exception {
}
}
 
+   @Test(expected = CliArgsException.class)
+   public void testUnrecognizedOption() throws Exception {
+   // test unrecognized option
+   String[] parameters = { "-v", "-l" };
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test(expected = CliArgsException.class)
--- End diff --

True. I'm, however, not a super fan of testing for exception messages. If 
the message is changed then you don't see it directly.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160721955
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java ---
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client;
-
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the RUN command.
- */
-public class CliFrontendRunTest {
--- End diff --

Sorry for that.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160721122
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
 ---
@@ -99,6 +69,44 @@ public void testStop() throws Exception {
}
}
 
+   @Test(expected = CliArgsException.class)
+   public void testUnrecognizedOption() throws Exception {
+   // test unrecognized option
+   String[] parameters = { "-v", "-l" };
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test(expected = CliArgsException.class)
+   public void testMissingJobId() throws Exception {
+   // test missing job id
+   String[] parameters = {};
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test
+   public void testUnknownJobId() throws Exception {
+   // test unknown job Id
+   JobID jid = new JobID();
+
+   String[] parameters = { jid.toString() };
+   StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(true);
+
+   try {
+   testFrontend.stop(parameters);
+   fail("Should have failed.");
+   } catch (IllegalArgumentException ignored) {
+   // expected
+   }
+
+   Mockito.verify(testFrontend.client, 
times(1)).stop(any(JobID.class));
--- End diff --

True. Will adapt it accordingly.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160720306
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -882,42 +784,28 @@ protected ClusterClient 
retrieveClient(CommandLineOptions options) {
}
}
 
-   /**
-* Retrieves the {@link ActorGateway} for the JobManager. The 
ClusterClient is retrieved
-* from the provided {@link CommandLineOptions}.
-*
-* @param options CommandLineOptions specifying the JobManager URL
-* @return Gateway to the JobManager
-* @throws Exception
-*/
-   protected ActorGateway getJobManagerGateway(CommandLineOptions options) 
throws Exception {
-   logAndSysout("Retrieving JobManager.");
-   return retrieveClient(options).getJobManagerGateway();
-   }
-
/**
 * Creates a {@link ClusterClient} object from the given command line 
options and other parameters.
-* @param options Command line options
+* @param customCommandLine custom command line to use to retrieve the 
client
+* @param commandLine command line to use
 * @param program The program for which to create the client.
 * @throws Exception
 */
protected ClusterClient createClient(
-   CommandLineOptions options,
+   CustomCommandLine customCommandLine,
--- End diff --

Was probably not necessary. Since this method will be removed anyway in a 
subsequent PR, I'll keep it as it is.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160719791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -598,189 +535,154 @@ protected int cancel(String[] args) {
jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
targetDirectory = null;
} catch (Exception e) {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to 
cancel a job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in 
the command line arguments: " + e.getMessage());
}
} else {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to cancel a 
job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in the 
command line arguments.");
}
 
+   final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
+   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory);
+
try {
-   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
-   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
-   try {
-   if (withSavepoint) {
-   if (targetDirectory == null) {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to default savepoint directory.");
-   } else {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to " + targetDirectory + '.');
-   }
-   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
-   logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + '.');
+   if (withSavepoint) {
+   if (targetDirectory == null) {
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
} else {
-   logAndSysout("Cancelling job " + jobId 
+ '.');
-   client.cancel(jobId);
-   logAndSysout("Cancelled job " + jobId + 
'.');
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + '.');
}
+   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
+   logAndSysout("Cancelled job " + jobId + ". 
Savepoint stored in " + savepointPath + '.');
+   } else {
+   logAndSysout("Cancelling job " + jobId + '.');
+   client.cancel(jobId);
+   logAndSysout("Cancelled job " + jobId + '.');
+   }
 
-   return 0;
-   } finally {
+   return 0;
+   } finally {
+   try {
client.shutdown();
+   } catch (Exception e) {
+   LOG.info("Could not properly shut down the 
client.", e);
}
}
-   catch (Throwable t) {
-   return handleError(t);
-   }
}
 
/**
 * Executes the SAVEPOINT action.
 *
 * @param args Command line arguments for the cancel action.
 */
-   protected int savepoint(String[] args) {
+   protected int savepoint(String[] args) throws CliArgsException {
LOG.info("Running 'savepoint' command.");
 
-   SavepointOptions options;
-   try {
-   options = CliFrontendParser.parseSavepointCommand(args);
-   } catch (CliArgsException e) {
-   return handleArgException(e);
-   } catch (Throwable t) {
-   return handleError(t);
-   }
+   SavepointOptions options = 

[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160715198
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -587,9 +526,7 @@ protected int cancel(String[] args) {
try {
jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
} catch (Exception e) {
-   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
-   System.out.println("Error: The value for the 
Job ID is not a valid ID.");
-   return 1;
+   throw new CliArgsException("The value for the 
JobID is not a valid ID: " + e.getMessage());
--- End diff --

Will change it.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160711940
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -515,33 +464,28 @@ protected int stop(String[] args) {
 
if (stopArgs.length > 0) {
String jobIdString = stopArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   }
-   catch (Exception e) {
-   return handleError(e);
-   }
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

Good point. Will throw a more meaningful exception.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160710138
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -292,20 +267,14 @@ protected int run(String[] args) {
 *
 * @param args Command line arguments for the info action.
 */
-   protected int info(String[] args) {
+   protected int info(String[] args) throws CliArgsException, 
FileNotFoundException, ProgramInvocationException {
LOG.info("Running 'info' command.");
 
-   // Parse command line options
-   InfoOptions options;
-   try {
-   options = CliFrontendParser.parseInfoCommand(args);
--- End diff --

Will remove it.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r160709495
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -274,19 +251,13 @@ protected int run(String[] args) {
 
return executeProgram(program, client, userParallelism);
}
-   catch (Throwable t) {
-   return handleError(t);
-   }
finally {
-   if (client != null) {
-   try {
-   client.shutdown();
-   } catch (Exception e) {
-   LOG.warn("Could not properly shut down 
the cluster client.", e);
-   }
-   }
-   if (program != null) {
-   program.deleteExtractedLibraries();
+   program.deleteExtractedLibraries();
--- End diff --

Good catch. Will change it.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159446300
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
 ---
@@ -99,6 +69,44 @@ public void testStop() throws Exception {
}
}
 
+   @Test(expected = CliArgsException.class)
+   public void testUnrecognizedOption() throws Exception {
+   // test unrecognized option
+   String[] parameters = { "-v", "-l" };
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test(expected = CliArgsException.class)
--- End diff --

nit: Test could be stricter by asserting that the exception carries the 
expected message.
  


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159447901
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java ---
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client;
-
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the RUN command.
- */
-public class CliFrontendRunTest {
--- End diff --

File is not recognized as *moved* by git 😢 
  


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159449988
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
 ---
@@ -42,70 +39,54 @@
 /**
  * Tests for the CANCEL command.
  */
-public class CliFrontendCancelTest {
+public class CliFrontendCancelTest extends TestLogger {
 
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}
 
@Test
-   public void testCancel() {
-   try {
-   // test unrecognized option
-   {
-   String[] parameters = {"-v", "-l"};
-   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
-   int retCode = testFrontend.cancel(parameters);
-   assertTrue(retCode != 0);
-   }
-
-   // test missing job id
-   {
-   String[] parameters = {};
-   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
-   int retCode = testFrontend.cancel(parameters);
-   assertTrue(retCode != 0);
-   }
+   public void testCancel() throws Exception {
 
-   // test cancel properly
-   {
-   JobID jid = new JobID();
+   // test cancel properly
+   {
+   JobID jid = new JobID();
 
-   String[] parameters = { jid.toString() };
-   CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(false);
+   String[] parameters = { jid.toString() };
+   CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(false);
 
-   int retCode = testFrontend.cancel(parameters);
-   assertTrue(retCode == 0);
+   int retCode = testFrontend.cancel(parameters);
+   assertTrue(retCode == 0);
--- End diff --

nit: Code is copied but it's better to use `assertEquals` for proper 
failure reasons. 


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159432084
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -598,189 +535,154 @@ protected int cancel(String[] args) {
jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
targetDirectory = null;
} catch (Exception e) {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to 
cancel a job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in 
the command line arguments: " + e.getMessage());
}
} else {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to cancel a 
job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in the 
command line arguments.");
}
 
+   final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
+   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory);
+
try {
-   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
-   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
-   try {
-   if (withSavepoint) {
-   if (targetDirectory == null) {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to default savepoint directory.");
-   } else {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to " + targetDirectory + '.');
-   }
-   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
-   logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + '.');
+   if (withSavepoint) {
+   if (targetDirectory == null) {
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
} else {
-   logAndSysout("Cancelling job " + jobId 
+ '.');
-   client.cancel(jobId);
-   logAndSysout("Cancelled job " + jobId + 
'.');
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + '.');
}
+   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
+   logAndSysout("Cancelled job " + jobId + ". 
Savepoint stored in " + savepointPath + '.');
+   } else {
+   logAndSysout("Cancelling job " + jobId + '.');
+   client.cancel(jobId);
+   logAndSysout("Cancelled job " + jobId + '.');
+   }
 
-   return 0;
-   } finally {
+   return 0;
+   } finally {
+   try {
client.shutdown();
+   } catch (Exception e) {
+   LOG.info("Could not properly shut down the 
client.", e);
}
}
-   catch (Throwable t) {
-   return handleError(t);
-   }
}
 
/**
 * Executes the SAVEPOINT action.
 *
 * @param args Command line arguments for the cancel action.
 */
-   protected int savepoint(String[] args) {
+   protected int savepoint(String[] args) throws CliArgsException {
LOG.info("Running 'savepoint' command.");
 
-   SavepointOptions options;
-   try {
-   options = CliFrontendParser.parseSavepointCommand(args);
-   } catch (CliArgsException e) {
-   return handleArgException(e);
-   } catch (Throwable t) {
-   return handleError(t);
-   }
+   SavepointOptions options = 
CliFrontendParser.parseSavepointCommand(args);
 

[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159439656
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
 ---
@@ -99,6 +69,44 @@ public void testStop() throws Exception {
}
}
 
+   @Test(expected = CliArgsException.class)
+   public void testUnrecognizedOption() throws Exception {
+   // test unrecognized option
+   String[] parameters = { "-v", "-l" };
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test(expected = CliArgsException.class)
+   public void testMissingJobId() throws Exception {
+   // test missing job id
+   String[] parameters = {};
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   testFrontend.stop(parameters);
+
+   fail("Should have failed.");
+   }
+
+   @Test
+   public void testUnknownJobId() throws Exception {
+   // test unknown job Id
+   JobID jid = new JobID();
+
+   String[] parameters = { jid.toString() };
+   StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(true);
+
+   try {
+   testFrontend.stop(parameters);
+   fail("Should have failed.");
+   } catch (IllegalArgumentException ignored) {
+   // expected
+   }
+
+   Mockito.verify(testFrontend.client, 
times(1)).stop(any(JobID.class));
--- End diff --

You are already mocking the behavior of the `client` with:
```
doThrow(new IllegalArgumentException("Test 
exception")).when(client).stop(any(JobID.class));
```

Therefore the test should fail in the absence of the exception, i.e., if 
`stop` is not called. I think this `verify` is not needed:
```
@Test(expected = IllegalArgumentException.class)
public void testUnknownJobId() throws Exception {
// test unknown job Id
JobID jid = new JobID();

String[] parameters = {jid.toString()};
StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(true);

testFrontend.stop(parameters);
fail("Should have failed.");
}
```
  


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159435504
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -292,20 +267,14 @@ protected int run(String[] args) {
 *
 * @param args Command line arguments for the info action.
 */
-   protected int info(String[] args) {
+   protected int info(String[] args) throws CliArgsException, 
FileNotFoundException, ProgramInvocationException {
LOG.info("Running 'info' command.");
 
-   // Parse command line options
-   InfoOptions options;
-   try {
-   options = CliFrontendParser.parseInfoCommand(args);
--- End diff --

`CliFrontendParser.parseInfoCommand()` is no longer in use. Can be deleted.
  


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159422039
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -882,42 +784,28 @@ protected ClusterClient 
retrieveClient(CommandLineOptions options) {
}
}
 
-   /**
-* Retrieves the {@link ActorGateway} for the JobManager. The 
ClusterClient is retrieved
-* from the provided {@link CommandLineOptions}.
-*
-* @param options CommandLineOptions specifying the JobManager URL
-* @return Gateway to the JobManager
-* @throws Exception
-*/
-   protected ActorGateway getJobManagerGateway(CommandLineOptions options) 
throws Exception {
-   logAndSysout("Retrieving JobManager.");
-   return retrieveClient(options).getJobManagerGateway();
-   }
-
/**
 * Creates a {@link ClusterClient} object from the given command line 
options and other parameters.
-* @param options Command line options
+* @param customCommandLine custom command line to use to retrieve the 
client
+* @param commandLine command line to use
 * @param program The program for which to create the client.
 * @throws Exception
 */
protected ClusterClient createClient(
-   CommandLineOptions options,
+   CustomCommandLine customCommandLine,
--- End diff --

Was it really necessary to change the signature? It seems that the behavior 
is otherwise the same. Why not leave the line 
```
CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
```
inside this method.
  


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159425958
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -274,19 +251,13 @@ protected int run(String[] args) {
 
return executeProgram(program, client, userParallelism);
}
-   catch (Throwable t) {
-   return handleError(t);
-   }
finally {
-   if (client != null) {
-   try {
-   client.shutdown();
-   } catch (Exception e) {
-   LOG.warn("Could not properly shut down 
the cluster client.", e);
-   }
-   }
-   if (program != null) {
-   program.deleteExtractedLibraries();
+   program.deleteExtractedLibraries();
--- End diff --

This won't run if `createClient()` fails. Before the client creation was 
inside the `try` block. 


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159431758
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -598,189 +535,154 @@ protected int cancel(String[] args) {
jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
targetDirectory = null;
} catch (Exception e) {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to 
cancel a job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in 
the command line arguments: " + e.getMessage());
}
} else {
-   LOG.error("Missing JobID in the command line 
arguments.");
-   System.out.println("Error: Specify a Job ID to cancel a 
job.");
-   return 1;
+   throw new CliArgsException("Missing JobID in the 
command line arguments.");
}
 
+   final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
+   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory);
+
try {
-   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
-   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
-   try {
-   if (withSavepoint) {
-   if (targetDirectory == null) {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to default savepoint directory.");
-   } else {
-   logAndSysout("Cancelling job " 
+ jobId + " with savepoint to " + targetDirectory + '.');
-   }
-   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
-   logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + '.');
+   if (withSavepoint) {
+   if (targetDirectory == null) {
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
} else {
-   logAndSysout("Cancelling job " + jobId 
+ '.');
-   client.cancel(jobId);
-   logAndSysout("Cancelled job " + jobId + 
'.');
+   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + '.');
}
+   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
+   logAndSysout("Cancelled job " + jobId + ". 
Savepoint stored in " + savepointPath + '.');
+   } else {
+   logAndSysout("Cancelling job " + jobId + '.');
+   client.cancel(jobId);
+   logAndSysout("Cancelled job " + jobId + '.');
+   }
 
-   return 0;
-   } finally {
+   return 0;
+   } finally {
+   try {
client.shutdown();
+   } catch (Exception e) {
+   LOG.info("Could not properly shut down the 
client.", e);
}
}
-   catch (Throwable t) {
-   return handleError(t);
-   }
}
 
/**
 * Executes the SAVEPOINT action.
 *
 * @param args Command line arguments for the cancel action.
 */
-   protected int savepoint(String[] args) {
+   protected int savepoint(String[] args) throws CliArgsException {
LOG.info("Running 'savepoint' command.");
 
-   SavepointOptions options;
-   try {
-   options = CliFrontendParser.parseSavepointCommand(args);
-   } catch (CliArgsException e) {
-   return handleArgException(e);
-   } catch (Throwable t) {
-   return handleError(t);
-   }
+   SavepointOptions options = 
CliFrontendParser.parseSavepointCommand(args);
 

[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159430427
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -587,9 +526,7 @@ protected int cancel(String[] args) {
try {
jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
} catch (Exception e) {
-   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
-   System.out.println("Error: The value for the 
Job ID is not a valid ID.");
-   return 1;
+   throw new CliArgsException("The value for the 
JobID is not a valid ID: " + e.getMessage());
--- End diff --

Maybe log the input value as well
```
try {
new 
JobID(StringUtils.hexStringToByte("dea84ec368d886f1638c01447ec6951"));
} catch (Exception e) {
throw new CliArgsException("The value for the JobID is 
not a valid ID: " + e.getMessage());
}
```
message is:
```
The value for the JobID is not a valid ID: Argument bytes must by an array 
of 16 bytes
```

Also, this piece of code is duplicated.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159429333
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -515,33 +464,28 @@ protected int stop(String[] args) {
 
if (stopArgs.length > 0) {
String jobIdString = stopArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   }
-   catch (Exception e) {
-   return handleError(e);
-   }
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

The exception's message won't be super meaningful:
```
new JobID(StringUtils.hexStringToByte("dupa"));
```
```
java.lang.NumberFormatException: For input string: "du"
```


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5220#discussion_r159422003
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -882,42 +784,28 @@ protected ClusterClient 
retrieveClient(CommandLineOptions options) {
}
}
 
-   /**
-* Retrieves the {@link ActorGateway} for the JobManager. The 
ClusterClient is retrieved
-* from the provided {@link CommandLineOptions}.
-*
-* @param options CommandLineOptions specifying the JobManager URL
-* @return Gateway to the JobManager
-* @throws Exception
-*/
-   protected ActorGateway getJobManagerGateway(CommandLineOptions options) 
throws Exception {
-   logAndSysout("Retrieving JobManager.");
-   return retrieveClient(options).getJobManagerGateway();
-   }
-
/**
 * Creates a {@link ClusterClient} object from the given command line 
options and other parameters.
-* @param options Command line options
+* @param customCommandLine custom command line to use to retrieve the 
client
+* @param commandLine command line to use
 * @param program The program for which to create the client.
 * @throws Exception
 */
protected ClusterClient createClient(
-   CommandLineOptions options,
+   CustomCommandLine customCommandLine,
--- End diff --

Was it really necessary to change the signature? It seems that the behavior 
is otherwise the same. Why not leave the line 
```
CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
```
inside this method.


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2017-12-31 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5220

[FLINK-8333] [flip6] Separate deployment options from command options

## What is the purpose of the change

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

This PR is based on #5219. 

## Brief change log

- Move `CliFrontend` to `org.apache.flink.client.cli`
- Decouple creation of `RunOptions`, `ListOptions`, etc. from 
`CliFrontendParser`

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink splitCliOptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5220


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7
Author: Till Rohrmann 
Date:   2017-12-20T16:19:59Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.




---