[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5090 ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r156882122 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot --- End diff -- True, will use the american version ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r156882139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot +* requests. +* +* See FLINK-8089 +*/ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- same here. ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155496482 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot +* requests. +* +* See FLINK-8089 +*/ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- nit: same here ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155495322 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot --- End diff -- nit: *fulfill* instead of *fulfil* ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5090 [FLINK-8089] Also check for other pending slot requests in SlotPool#offerSlot ## What is the purpose of the change Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots in `SlotPool`. This PR is based on #5089. ## Verifying this change - Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to check that unused offered slots are directly used to fulfill other pending slot requests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixSlotOffering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5090.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5090 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. commit 3e4550c0607744b20893dc90c587b63e68e4de1e Author: Till Rohrmann Date: 2017-11-13T14:42:07Z [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. commit b04dda46aaf298d921929910574662970d9c5093 Author: Till Rohrmann Date: 2017-11-24T22:29:53Z [hotfix] Speed up RecoveryITCase ---