[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5090


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r156882122
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
--- End diff --

True, will use the american version


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r156882139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
+* requests.
+*
+* See FLINK-8089
+*/
+   @Test
+   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
+   final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+   final JobMasterId jobMasterId = JobMasterId.generate();
+   final String jobMasterAddress = "foobar";
+   final CompletableFuture allocationIdFuture = new 
CompletableFuture<>();
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+
+   resourceManagerGateway.setRequestSlotConsumer(
+   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+   final SlotRequestID slotRequestId1 = new SlotRequestID();
+   final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+   try {
+   slotPool.start(jobMasterId, jobMasterAddress);
+
+   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+   CompletableFuture slotFuture1 = 
slotPoolGateway.allocateSlot(
+   slotRequestId1,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   // wait for the first slot request
+   final AllocationID allocationId = 
allocationIdFuture.get();
+
+   CompletableFuture slotFuture2 = 
slotPoolGateway.allocateSlot(
+   slotRequestId2,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+   try {
+   // this should fail with a CancellationException
+   slotFuture1.get();
+   fail("The first slot future should have failed 
because it was cancelled.");
+   } catch (ExecutionException ee) {
+   
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
+   }
+
+   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
+
+   // the slot offer should fulfil the second slot request
--- End diff --

same here.


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155496482
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
+* requests.
+*
+* See FLINK-8089
+*/
+   @Test
+   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
+   final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+   final JobMasterId jobMasterId = JobMasterId.generate();
+   final String jobMasterAddress = "foobar";
+   final CompletableFuture allocationIdFuture = new 
CompletableFuture<>();
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+
+   resourceManagerGateway.setRequestSlotConsumer(
+   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+   final SlotRequestID slotRequestId1 = new SlotRequestID();
+   final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+   try {
+   slotPool.start(jobMasterId, jobMasterAddress);
+
+   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+   CompletableFuture slotFuture1 = 
slotPoolGateway.allocateSlot(
+   slotRequestId1,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   // wait for the first slot request
+   final AllocationID allocationId = 
allocationIdFuture.get();
+
+   CompletableFuture slotFuture2 = 
slotPoolGateway.allocateSlot(
+   slotRequestId2,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+   try {
+   // this should fail with a CancellationException
+   slotFuture1.get();
+   fail("The first slot future should have failed 
because it was cancelled.");
+   } catch (ExecutionException ee) {
+   
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
+   }
+
+   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
+
+   // the slot offer should fulfil the second slot request
--- End diff --

nit: same here


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155495322
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
--- End diff --

nit: *fulfill* instead of *fulfil*


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5090

[FLINK-8089] Also check for other pending slot requests in 
SlotPool#offerSlot

## What is the purpose of the change

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots in `SlotPool`.

This PR is based on #5089.

## Verifying this change

- Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to 
check that unused offered slots are directly used to fulfill other pending slot 
requests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixSlotOffering

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5090


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.

commit 3e4550c0607744b20893dc90c587b63e68e4de1e
Author: Till Rohrmann 
Date:   2017-11-13T14:42:07Z

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

commit b04dda46aaf298d921929910574662970d9c5093
Author: Till Rohrmann 
Date:   2017-11-24T22:29:53Z

[hotfix] Speed up RecoveryITCase




---