[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-23 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5715


---


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5715#discussion_r176663967
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
@@ -528,54 +454,44 @@ public void 
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
 
try {
-   jobManager = 
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = 
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
 
-   jobID = jobGraph.getJobID();
-
-   cluster.submitJobDetached(jobGraph);
+   final JobID jobID = jobGraph.getJobID();
 
-   Object savepointResponse = null;
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
RescalingITCase.class.getClassLoader());
 
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
 
-   while (deadline.hasTimeLeft()) {
-
-   Future savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.empty()), deadline.timeLeft());
-   FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
-   savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
-
-   if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
-   break;
-   }
-   }
-
-   assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
-
-   final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) 
savepointResponse).savepointPath();
-
-   Future jobRemovedFuture = jobManager.ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-   Future cancellationResponseFuture = 
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-   Object cancellationResponse = 
Await.result(cancellationResponseFuture, deadline.timeLeft());
+   CompletableFuture savepointPathFuture = 
FutureUtils.retryWithDelay(
+   () -> {
+   try {
+   return 
client.triggerSavepoint(jobID, null);
+   } catch (FlinkException e) {
+   throw new RuntimeException(e);
--- End diff --

yes, I'll change it and merge afterwards.


---


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5715#discussion_r176475682
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
@@ -528,54 +454,44 @@ public void 
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
 
try {
-   jobManager = 
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = 
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
 
-   jobID = jobGraph.getJobID();
-
-   cluster.submitJobDetached(jobGraph);
+   final JobID jobID = jobGraph.getJobID();
 
-   Object savepointResponse = null;
+   client.setDetached(true);
+   client.submitJob(jobGraph, 
RescalingITCase.class.getClassLoader());
 
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
 
-   while (deadline.hasTimeLeft()) {
-
-   Future savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.empty()), deadline.timeLeft());
-   FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
-   savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
-
-   if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
-   break;
-   }
-   }
-
-   assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
-
-   final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) 
savepointResponse).savepointPath();
-
-   Future jobRemovedFuture = jobManager.ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-   Future cancellationResponseFuture = 
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-   Object cancellationResponse = 
Await.result(cancellationResponseFuture, deadline.timeLeft());
+   CompletableFuture savepointPathFuture = 
FutureUtils.retryWithDelay(
+   () -> {
+   try {
+   return 
client.triggerSavepoint(jobID, null);
+   } catch (FlinkException e) {
+   throw new RuntimeException(e);
--- End diff --

Shouldn't we return a exceptionally completed future here?


---


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5715

[FLINK-8956][tests] Port RescalingITCase to flip6 

Based on #5690.

## What is the purpose of the change

Ports the `RescalingITCase` to use `MiniClusterResource`.

## Verifying this change

Run `RescalingITCase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8956

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5715.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5715






---