[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5219 ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160706043 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws Exception { public void testDisposeSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - String savepointPath = "expectedSavepointPath"; - ActorGateway jobManager = mock(ActorGateway.class); + String savepointPath = "expectedSavepointPath"; - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + final CompletableFuture disposeCallFuture = new CompletableFuture<>(); - when(jobManager.ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class))).thenReturn(triggerResponse.future()); + ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> { + disposeCallFuture.complete(Acknowledge.get()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); - triggerResponse.success(getDisposeSavepointSuccess()); + try { - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + CliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { "-d", savepointPath }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class)); + disposeCallFuture.get(); --- End diff -- True. Will remove it. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160704416 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws Exception { public void testDisposeSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - String savepointPath = "expectedSavepointPath"; - ActorGateway jobManager = mock(ActorGateway.class); + String savepointPath = "expectedSavepointPath"; - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + final CompletableFuture disposeCallFuture = new CompletableFuture<>(); - when(jobManager.ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class))).thenReturn(triggerResponse.future()); + ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> { + disposeCallFuture.complete(Acknowledge.get()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); - triggerResponse.success(getDisposeSavepointSuccess()); + try { - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + CliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { "-d", savepointPath }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class)); + disposeCallFuture.get(); String outMsg = buffer.toString(); assertTrue(outMsg.contains(savepointPath)); assertTrue(outMsg.contains("disposed")); } finally { - restoreStdOutAndStdErr(); - } - } - - /** -* Tests that a disposal failure due a ClassNotFoundException triggers a -* note about the JAR option. -*/ - @Test - public void testDisposeClassNotFoundException() throws Exception { --- End diff -- Yes indeed, will add it with the referenced commit. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160699710 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception { replaceStdOutAndStdErr(); try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient( + new Configuration(), + new TestingHighAvailabilityServices())); String[] parameters = { "invalid job id" }; int returnCode = frontend.savepoint(parameters); - assertTrue(returnCode != 0); - assertTrue(buffer.toString().contains("not a valid ID")); --- End diff -- I think it's not important. We all know it will get removed anyways. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160699257 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -149,23 +153,26 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception { public void testTriggerSavepointCustomTarget() throws Exception { replaceStdOutAndStdErr(); - try { - JobID jobId = new JobID(); + JobID jobId = new JobID(); - String savepointDirectory = "customTargetDirectory"; + String savepointDirectory = "customTargetDirectory"; - MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory); + final ClusterClient clusterClient = createClusterClient(savepointDirectory); + + try { + MockedCliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { jobId.toString(), savepointDirectory }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); --- End diff -- Backport of tests. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160699213 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception { replaceStdOutAndStdErr(); try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient( + new Configuration(), + new TestingHighAvailabilityServices())); String[] parameters = { "invalid job id" }; int returnCode = frontend.savepoint(parameters); - assertTrue(returnCode != 0); - assertTrue(buffer.toString().contains("not a valid ID")); --- End diff -- True. Will re-add for the sake of correctness. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160698812 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -77,23 +74,25 @@ public void testTriggerSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - JobID jobId = new JobID(); + JobID jobId = new JobID(); - String savepointPath = "expectedSavepointPath"; + String savepointPath = "expectedSavepointPath"; - MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath); + final ClusterClient clusterClient = createClusterClient(savepointPath); + + try { + MockedCliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { jobId.toString() }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); --- End diff -- Because I moved the tests from a later commit where I change the signature of the `savepoint` command to return `void` and throw an `Exception` in case of a failure. Will add the assertions again. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160697901 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } catch (Exception e) { + return handleArgException(new IllegalArgumentException( + "Error: The value for the Job ID is not a valid ID.")); + } + } else { + return handleArgException(new IllegalArgumentException( "Error: The value for the Job ID is not a valid ID. " + - "Specify a Job ID to trigger a savepoint.")); - } + "Specify a Job ID to trigger a savepoint.")); + } - String savepointDirectory = null; - if (cleanedArgs.length >= 2) { - savepointDirectory = cleanedArgs[1]; - } + String savepointDirectory = null; + if (cleanedArgs.length >= 2) { + savepointDirectory = cleanedArgs[1]; + } - // Print superfluous arguments - if (cleanedArgs.length >= 3) { - logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); - } + // Print superfluous arguments + if (cleanedArgs.length >= 3) { + logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); + } - return triggerSavepoint(options, jobId, savepointDirectory); + return triggerSavepoint(clusterClient, jobId, savepointDirectory); + } + } catch (Exception e) { + return handleError(e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + LOG.info("Could not shutdown the cluster client.", e); + } } } /** * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint} * message to the job manager. */ - private int triggerSavepoint(SavepointOptions options, JobID jobId, String
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r160697769 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- Good catch. Will change it. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159401004 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } catch (Exception e) { + return handleArgException(new IllegalArgumentException( + "Error: The value for the Job ID is not a valid ID.")); + } + } else { + return handleArgException(new IllegalArgumentException( "Error: The value for the Job ID is not a valid ID. " + - "Specify a Job ID to trigger a savepoint.")); - } + "Specify a Job ID to trigger a savepoint.")); + } - String savepointDirectory = null; - if (cleanedArgs.length >= 2) { - savepointDirectory = cleanedArgs[1]; - } + String savepointDirectory = null; + if (cleanedArgs.length >= 2) { + savepointDirectory = cleanedArgs[1]; + } - // Print superfluous arguments - if (cleanedArgs.length >= 3) { - logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); - } + // Print superfluous arguments + if (cleanedArgs.length >= 3) { + logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); + } - return triggerSavepoint(options, jobId, savepointDirectory); + return triggerSavepoint(clusterClient, jobId, savepointDirectory); + } + } catch (Exception e) { + return handleError(e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + LOG.info("Could not shutdown the cluster client.", e); + } } } /** * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint} * message to the job manager. */ - private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory)
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159252679 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -77,23 +74,25 @@ public void testTriggerSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - JobID jobId = new JobID(); + JobID jobId = new JobID(); - String savepointPath = "expectedSavepointPath"; + String savepointPath = "expectedSavepointPath"; - MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath); + final ClusterClient clusterClient = createClusterClient(savepointPath); + + try { + MockedCliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { jobId.toString() }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); --- End diff -- Is there a reason why this assertion was removed? I think it would still pass. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159402067 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws Exception { public void testDisposeSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - String savepointPath = "expectedSavepointPath"; - ActorGateway jobManager = mock(ActorGateway.class); + String savepointPath = "expectedSavepointPath"; - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + final CompletableFuture disposeCallFuture = new CompletableFuture<>(); - when(jobManager.ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class))).thenReturn(triggerResponse.future()); + ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> { + disposeCallFuture.complete(Acknowledge.get()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); - triggerResponse.success(getDisposeSavepointSuccess()); + try { - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + CliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { "-d", savepointPath }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class)); + disposeCallFuture.get(); --- End diff -- Everything is run in the main thread: ``` @Override public CompletableFuture disposeSavepoint(String savepointPath, Time timeout) { return disposeSavepointFunction.apply(savepointPath, timeout); } ``` There is no need to wait for the completion of this future. The future can be removed. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159403066 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws Exception { public void testDisposeSavepointSuccess() throws Exception { replaceStdOutAndStdErr(); - try { - String savepointPath = "expectedSavepointPath"; - ActorGateway jobManager = mock(ActorGateway.class); + String savepointPath = "expectedSavepointPath"; - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + final CompletableFuture disposeCallFuture = new CompletableFuture<>(); - when(jobManager.ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class))).thenReturn(triggerResponse.future()); + ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> { + disposeCallFuture.complete(Acknowledge.get()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); - triggerResponse.success(getDisposeSavepointSuccess()); + try { - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + CliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { "-d", savepointPath }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new DisposeSavepoint(savepointPath)), - any(FiniteDuration.class)); + disposeCallFuture.get(); String outMsg = buffer.toString(); assertTrue(outMsg.contains(savepointPath)); assertTrue(outMsg.contains("disposed")); } finally { - restoreStdOutAndStdErr(); - } - } - - /** -* Tests that a disposal failure due a ClassNotFoundException triggers a -* note about the JAR option. -*/ - @Test - public void testDisposeClassNotFoundException() throws Exception { --- End diff -- The behaviour moved to `ClusterClient` but we are not testing it anymore. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159400326 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -149,23 +153,26 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception { public void testTriggerSavepointCustomTarget() throws Exception { replaceStdOutAndStdErr(); - try { - JobID jobId = new JobID(); + JobID jobId = new JobID(); - String savepointDirectory = "customTargetDirectory"; + String savepointDirectory = "customTargetDirectory"; - MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory); + final ClusterClient clusterClient = createClusterClient(savepointDirectory); + + try { + MockedCliFrontend frontend = new MockedCliFrontend(clusterClient); String[] parameters = { jobId.toString(), savepointDirectory }; - int returnCode = frontend.savepoint(parameters); + frontend.savepoint(parameters); - assertEquals(0, returnCode); --- End diff -- Is there a reason why this assertion was removed? I think it would still pass. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159255690 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java --- @@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() throws Exception { replaceStdOutAndStdErr(); try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient( + new Configuration(), + new TestingHighAvailabilityServices())); String[] parameters = { "invalid job id" }; int returnCode = frontend.savepoint(parameters); - assertTrue(returnCode != 0); - assertTrue(buffer.toString().contains("not a valid ID")); --- End diff -- What happened to this assertion? It would still pass. ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159249258 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- nit: `JobID.fromHexString(jobIdString)` ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159248959 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- `JobID.fromHexString` ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5219 [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient ## What is the purpose of the change Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5219 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. ---