[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5219


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160706043
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws 
Exception {
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   String savepointPath = "expectedSavepointPath";
-   ActorGateway jobManager = mock(ActorGateway.class);
+   String savepointPath = "expectedSavepointPath";
 
-   Promise triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+   final CompletableFuture disposeCallFuture = new 
CompletableFuture<>();
 
-   when(jobManager.ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   
any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+   ClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path, Time timeout) -> {
+   disposeCallFuture.complete(Acknowledge.get());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   });
 
-   triggerResponse.success(getDisposeSavepointSuccess());
+   try {
 
-   CliFrontend frontend = new MockCliFrontend(
-   CliFrontendTestUtils.getConfigDir(), 
jobManager);
+   CliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { "-d", savepointPath };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
-   verify(jobManager, times(1)).ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   any(FiniteDuration.class));
+   disposeCallFuture.get();
--- End diff --

True. Will remove it.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160704416
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws 
Exception {
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   String savepointPath = "expectedSavepointPath";
-   ActorGateway jobManager = mock(ActorGateway.class);
+   String savepointPath = "expectedSavepointPath";
 
-   Promise triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+   final CompletableFuture disposeCallFuture = new 
CompletableFuture<>();
 
-   when(jobManager.ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   
any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+   ClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path, Time timeout) -> {
+   disposeCallFuture.complete(Acknowledge.get());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   });
 
-   triggerResponse.success(getDisposeSavepointSuccess());
+   try {
 
-   CliFrontend frontend = new MockCliFrontend(
-   CliFrontendTestUtils.getConfigDir(), 
jobManager);
+   CliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { "-d", savepointPath };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
-   verify(jobManager, times(1)).ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   any(FiniteDuration.class));
+   disposeCallFuture.get();
 
String outMsg = buffer.toString();
assertTrue(outMsg.contains(savepointPath));
assertTrue(outMsg.contains("disposed"));
}
finally {
-   restoreStdOutAndStdErr();
-   }
-   }
-
-   /**
-* Tests that a disposal failure due a  ClassNotFoundException triggers 
a
-* note about the JAR option.
-*/
-   @Test
-   public void testDisposeClassNotFoundException() throws Exception {
--- End diff --

Yes indeed, will add it with the referenced commit.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160699710
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
replaceStdOutAndStdErr();
 
try {
-   CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   CliFrontend frontend = new MockedCliFrontend(new 
StandaloneClusterClient(
+   new Configuration(),
+   new TestingHighAvailabilityServices()));
 
String[] parameters = { "invalid job id" };
int returnCode = frontend.savepoint(parameters);
 
-   assertTrue(returnCode != 0);
-   assertTrue(buffer.toString().contains("not a valid 
ID"));
--- End diff --

I think it's not important. We all know it will get removed anyways.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160699257
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -149,23 +153,26 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
public void testTriggerSavepointCustomTarget() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   JobID jobId = new JobID();
+   JobID jobId = new JobID();
 
-   String savepointDirectory = "customTargetDirectory";
+   String savepointDirectory = "customTargetDirectory";
 
-   MockedCliFrontend frontend = new 
SavepointTestCliFrontend(savepointDirectory);
+   final ClusterClient clusterClient = 
createClusterClient(savepointDirectory);
+
+   try {
+   MockedCliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { jobId.toString(), 
savepointDirectory };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
--- End diff --

Backport of tests.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160699213
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
replaceStdOutAndStdErr();
 
try {
-   CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   CliFrontend frontend = new MockedCliFrontend(new 
StandaloneClusterClient(
+   new Configuration(),
+   new TestingHighAvailabilityServices()));
 
String[] parameters = { "invalid job id" };
int returnCode = frontend.savepoint(parameters);
 
-   assertTrue(returnCode != 0);
-   assertTrue(buffer.toString().contains("not a valid 
ID"));
--- End diff --

True. Will re-add for the sake of correctness.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160698812
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -77,23 +74,25 @@
public void testTriggerSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   JobID jobId = new JobID();
+   JobID jobId = new JobID();
 
-   String savepointPath = "expectedSavepointPath";
+   String savepointPath = "expectedSavepointPath";
 
-   MockedCliFrontend frontend = new 
SavepointTestCliFrontend(savepointPath);
+   final ClusterClient clusterClient = 
createClusterClient(savepointPath);
+
+   try {
+   MockedCliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { jobId.toString() };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
--- End diff --

Because I moved the tests from a later commit where I change the signature 
of the `savepoint` command to return `void` and throw an `Exception` in case of 
a failure.  Will add the assertions again.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160697901
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   } catch (Exception e) {
+   return handleArgException(new 
IllegalArgumentException(
+   "Error: The value for 
the Job ID is not a valid ID."));
+   }
+   } else {
+   return handleArgException(new 
IllegalArgumentException(
"Error: The value for the Job 
ID is not a valid ID. " +
-   "Specify a Job 
ID to trigger a savepoint."));
-   }
+   "Specify a Job ID to 
trigger a savepoint."));
+   }
 
-   String savepointDirectory = null;
-   if (cleanedArgs.length >= 2) {
-   savepointDirectory = cleanedArgs[1];
-   }
+   String savepointDirectory = null;
+   if (cleanedArgs.length >= 2) {
+   savepointDirectory = cleanedArgs[1];
+   }
 
-   // Print superfluous arguments
-   if (cleanedArgs.length >= 3) {
-   logAndSysout("Provided more arguments than 
required. Ignoring not needed arguments.");
-   }
+   // Print superfluous arguments
+   if (cleanedArgs.length >= 3) {
+   logAndSysout("Provided more arguments 
than required. Ignoring not needed arguments.");
+   }
 
-   return triggerSavepoint(options, jobId, 
savepointDirectory);
+   return triggerSavepoint(clusterClient, jobId, 
savepointDirectory);
+   }
+   } catch (Exception e) {
+   return handleError(e);
+   } finally {
+   try {
+   clusterClient.shutdown();
+   } catch (Exception e) {
+   LOG.info("Could not shutdown the cluster 
client.", e);
+   }
}
}
 
/**
 * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
 * message to the job manager.
 */
-   private int triggerSavepoint(SavepointOptions options, JobID jobId, 
String 

[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r160697769
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

Good catch. Will change it.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159401004
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   } catch (Exception e) {
+   return handleArgException(new 
IllegalArgumentException(
+   "Error: The value for 
the Job ID is not a valid ID."));
+   }
+   } else {
+   return handleArgException(new 
IllegalArgumentException(
"Error: The value for the Job 
ID is not a valid ID. " +
-   "Specify a Job 
ID to trigger a savepoint."));
-   }
+   "Specify a Job ID to 
trigger a savepoint."));
+   }
 
-   String savepointDirectory = null;
-   if (cleanedArgs.length >= 2) {
-   savepointDirectory = cleanedArgs[1];
-   }
+   String savepointDirectory = null;
+   if (cleanedArgs.length >= 2) {
+   savepointDirectory = cleanedArgs[1];
+   }
 
-   // Print superfluous arguments
-   if (cleanedArgs.length >= 3) {
-   logAndSysout("Provided more arguments than 
required. Ignoring not needed arguments.");
-   }
+   // Print superfluous arguments
+   if (cleanedArgs.length >= 3) {
+   logAndSysout("Provided more arguments 
than required. Ignoring not needed arguments.");
+   }
 
-   return triggerSavepoint(options, jobId, 
savepointDirectory);
+   return triggerSavepoint(clusterClient, jobId, 
savepointDirectory);
+   }
+   } catch (Exception e) {
+   return handleError(e);
+   } finally {
+   try {
+   clusterClient.shutdown();
+   } catch (Exception e) {
+   LOG.info("Could not shutdown the cluster 
client.", e);
+   }
}
}
 
/**
 * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
 * message to the job manager.
 */
-   private int triggerSavepoint(SavepointOptions options, JobID jobId, 
String savepointDirectory) 

[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159252679
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -77,23 +74,25 @@
public void testTriggerSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   JobID jobId = new JobID();
+   JobID jobId = new JobID();
 
-   String savepointPath = "expectedSavepointPath";
+   String savepointPath = "expectedSavepointPath";
 
-   MockedCliFrontend frontend = new 
SavepointTestCliFrontend(savepointPath);
+   final ClusterClient clusterClient = 
createClusterClient(savepointPath);
+
+   try {
+   MockedCliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { jobId.toString() };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
--- End diff --

Is there a reason why this assertion was removed? I think it would still 
pass.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159402067
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws 
Exception {
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   String savepointPath = "expectedSavepointPath";
-   ActorGateway jobManager = mock(ActorGateway.class);
+   String savepointPath = "expectedSavepointPath";
 
-   Promise triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+   final CompletableFuture disposeCallFuture = new 
CompletableFuture<>();
 
-   when(jobManager.ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   
any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+   ClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path, Time timeout) -> {
+   disposeCallFuture.complete(Acknowledge.get());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   });
 
-   triggerResponse.success(getDisposeSavepointSuccess());
+   try {
 
-   CliFrontend frontend = new MockCliFrontend(
-   CliFrontendTestUtils.getConfigDir(), 
jobManager);
+   CliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { "-d", savepointPath };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
-   verify(jobManager, times(1)).ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   any(FiniteDuration.class));
+   disposeCallFuture.get();
--- End diff --

Everything is run in the main thread:
```
@Override
public CompletableFuture disposeSavepoint(String 
savepointPath, Time timeout) {
return disposeSavepointFunction.apply(savepointPath, 
timeout);
}
```
There is no need to wait for the completion of this future. The future can 
be removed.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159403066
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -178,65 +185,30 @@ public void testTriggerSavepointCustomTarget() throws 
Exception {
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   String savepointPath = "expectedSavepointPath";
-   ActorGateway jobManager = mock(ActorGateway.class);
+   String savepointPath = "expectedSavepointPath";
 
-   Promise triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+   final CompletableFuture disposeCallFuture = new 
CompletableFuture<>();
 
-   when(jobManager.ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   
any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+   ClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path, Time timeout) -> {
+   disposeCallFuture.complete(Acknowledge.get());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   });
 
-   triggerResponse.success(getDisposeSavepointSuccess());
+   try {
 
-   CliFrontend frontend = new MockCliFrontend(
-   CliFrontendTestUtils.getConfigDir(), 
jobManager);
+   CliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { "-d", savepointPath };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
-   verify(jobManager, times(1)).ask(
-   Mockito.eq(new 
DisposeSavepoint(savepointPath)),
-   any(FiniteDuration.class));
+   disposeCallFuture.get();
 
String outMsg = buffer.toString();
assertTrue(outMsg.contains(savepointPath));
assertTrue(outMsg.contains("disposed"));
}
finally {
-   restoreStdOutAndStdErr();
-   }
-   }
-
-   /**
-* Tests that a disposal failure due a  ClassNotFoundException triggers 
a
-* note about the JAR option.
-*/
-   @Test
-   public void testDisposeClassNotFoundException() throws Exception {
--- End diff --

The behaviour moved to `ClusterClient` but we are not testing it anymore.
  


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159400326
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -149,23 +153,26 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
public void testTriggerSavepointCustomTarget() throws Exception {
replaceStdOutAndStdErr();
 
-   try {
-   JobID jobId = new JobID();
+   JobID jobId = new JobID();
 
-   String savepointDirectory = "customTargetDirectory";
+   String savepointDirectory = "customTargetDirectory";
 
-   MockedCliFrontend frontend = new 
SavepointTestCliFrontend(savepointDirectory);
+   final ClusterClient clusterClient = 
createClusterClient(savepointDirectory);
+
+   try {
+   MockedCliFrontend frontend = new 
MockedCliFrontend(clusterClient);
 
String[] parameters = { jobId.toString(), 
savepointDirectory };
-   int returnCode = frontend.savepoint(parameters);
+   frontend.savepoint(parameters);
 
-   assertEquals(0, returnCode);
--- End diff --

Is there a reason why this assertion was removed? I think it would still 
pass.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159255690
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 ---
@@ -128,13 +131,14 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
replaceStdOutAndStdErr();
 
try {
-   CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   CliFrontend frontend = new MockedCliFrontend(new 
StandaloneClusterClient(
+   new Configuration(),
+   new TestingHighAvailabilityServices()));
 
String[] parameters = { "invalid job id" };
int returnCode = frontend.savepoint(parameters);
 
-   assertTrue(returnCode != 0);
-   assertTrue(buffer.toString().contains("not a valid 
ID"));
--- End diff --

What happened to this assertion? It would still pass.


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159249258
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

nit: `JobID.fromHexString(jobIdString)`


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159248959
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

`JobID.fromHexString`


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2017-12-31 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5219

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

## What is the purpose of the change

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5219


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.




---