[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441577695



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   sure, I agree that  we can do the overall test cleanup as a separate 
issue if it is too much for this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441364374



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,13 +41,13 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;

Review comment:
   Alright





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441359953



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+   setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+   final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);

Review comment:
   why do we use `Scheduler` to unit test `SlotPoolImpl`?
   why not to call `SlotPoolImpl` directly, like in 
`testFailingAllocationFailsRemappedPendingSlotRequests`?

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+   setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+   final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+
+   final SlotRequestId slotRequestId1 = new 
SlotRequestId();
+   scheduler.allocateSlot(
+   slotRequestId1,
+   new DummyScheduledUnit(),
+   SlotProfile.noRequirements(),
+   timeout);

Review comment:
   ```suggestion
allocateSlot(scheduler, slotRequestId1);
   ```

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
   nit: maybe, not now but if it can be reused also in other tests, it 
would be nice to have something like an RM harness:
   ```
   class RmHarness {
   final List allocationIds = new ArrayList<>();
   final List canceledAllocations = new ArrayList<>();
   RmHarness(resourceManagerGateway)
   getAllocations
   getCanceled
   }
   ```

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   Could we reuse/deduplicate `setUpSlotPool` also in `SlotPoolImplTest`?

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
 

[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441346884



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,13 +41,13 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;

Review comment:
   Hmm, for me, the order would be not obvious anyways w/o either looking 
into the implementation or the jdoc comment. I am not strictly opposed to the 
change but I would avoid it if it is not strictly needed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441343582



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
return null;
}
 
+   private void maybeRemapOrphanedAllocation(
+   final AllocationID allocationIdOfRequest,
+   final AllocationID allocationIdOfSlot) {
+
+   final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+   ? null : allocationIdOfRequest;
+
+   // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+   // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+   if (orphanedAllocationId != null) {
+   final SlotRequestId requestIdOfAllocatedSlot = 
pendingRequests.getKeyA(allocationIdOfSlot);
+   if (requestIdOfAllocatedSlot != null) {
+   final PendingRequest requestOfAllocatedSlot = 
pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+   
requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+   // this re-insertion of initiatedRequestId will 
not affect its original insertion order
+   pendingRequests.put(requestIdOfAllocatedSlot, 
orphanedAllocationId, requestOfAllocatedSlot);
+   } else {
+   // cancel the slot request if the orphaned 
allocation is not remapped to a pending request.
+   // the request id can be null if the slot is 
returned by scheduler
+   
resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
   Alright, so this is something like release the slot at the RM at the 
same time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-16 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440693739



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the 
primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over 
the values and
+ * the primary key set is the insertion order. Note that there is no contract 
of the iteration
+ * order over the secondary key set.

Review comment:
   I would also explicitly document that `@param ` is the primary key.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,13 +41,13 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;

Review comment:
   Does it matter for this PR which type `bMap` has?

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
assertThat(map.getByKeyB(1), is(secondValue));
assertThat(map.getByKeyA(2), is(secondValue));
}
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 1, value);
+   assertThat(map.keySetA().iterator().next(), is(1));

Review comment:
   I think `values()` are also interesting to check for both added tests.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
}
}
 
+   public A getKeyA(B bKey) {

Review comment:
   ```suggestion
public A getKeyAByKeyB(B bKey) {
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -64,7 +64,7 @@ public V getKeyA(A aKey) {
}
}
 
-   public V getKeyB(B bKey) {
+   public V getByKeyB(B bKey) {

Review comment:
   ```suggestion
public V getValueByKeyB(B bKey) {
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -54,7 +54,7 @@ public int size() {
return aMap.size();
}
 
-   public V getKeyA(A aKey) {
+   public V getByKeyA(A aKey) {

Review comment:
   ```suggestion
public V getValueByKeyA(A aKey) {
   ```

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
assertThat(map.getByKeyB(1), is(secondValue));
assertThat(map.getByKeyA(2), is(secondValue));
}
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 1, value);
+   assertThat(map.keySetA().iterator().next(), is(1));
+   }
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 3, value);

Review comment:
   do we also want to check cleanup of key B `3` if it were in the map?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
return null;
}
 
+   private void maybeRemapOrphanedAllocation(
+   final AllocationID allocationIdOfRequest,
+   final AllocationID allocationIdOfSlot) {
+
+   final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+   

[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-11 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r438835441



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,20 +44,20 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;
 
private transient Collection values;
 
public DualKeyLinkedMap(int initialCapacity) {

Review comment:
   Makes sense to me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-11 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r438824119



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   Ok, the FLINK-13165 makes slot requests to be completed in order only if 
the offers come with unknown `AllocationIds`, right? Generally we expect that 
RM keeps the `AllocationId` to match `SlotRequestID`. I am fine to break the 
tie `SlotRequestId->AllocationID` if there is no known consequences. 
Eventually, I hope it might even help to simplify the `SlotPoolImpl`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-04 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   There is this `UnfulfillableSlotRequestException` which is still a fail 
fast route if RM finds that a certain request profile cannot be fulfilled at 
all with any existing slot and cannot be allocated. It is relevant for both 
batch and streaming and bulk as I see. I do not know the whole background of 
this. At first glance, this looks to me as an optimisation that complicates 
things a bit at the moment. It is probably necessary to avoid timeout waiting 
to cancel everything if it is already clear that allocation can never succeed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-04 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   There is this `UnfulfillableSlotRequestException` which is still a fail 
fast route if RM finds that a certain request profile cannot be fulfilled at 
all with any existing slot and cannot be allocated. It is relevant for both 
batch and streaming and bulk as I see. I do not know the whole background of 
this. At first glance, it seems to be a complication but it is probably 
necessary to avoid timeout waiting to cancel everything if it is already clear 
that allocation can never succeed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-04 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   There is this `UnfulfillableSlotRequestException` which is still a fail 
fast route if RM finds that a certain request profile cannot be fulfilled at 
all with any existing slot and cannot be allocated. It is relevant for both 
batch and streaming and bulk as I see. I do not know the whole background of 
this. At first glance, it seems to be a complication but it is probably 
necessary to avoid timeout waiting to cancel everything if it is already clear 
that allocation cannot succeed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-04 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   There is this `UnfulfillableSlotRequestException` which is still a fail 
fast route if RM finds that a certain request profile cannot be fulfilled at 
all with any existing slot and cannot be allocated. It is relevant for both 
batch and streaming and bulk as I see. I do not know the whole background of 
this. At first glance, it seems to be a complication but it might be necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-05-27 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r430441131



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -112,6 +114,9 @@
/** The requests that are waiting for the resource manager to be 
connected. */
private final LinkedHashMap 
waitingForResourceManager;
 
+   /** Maps a request to its allocation. */
+   private final BiMap requestedAllocations;

Review comment:
   Looking into the implementation of `DualKeyLinkedMap` for 
`pendingRequests`, it seems we can just remove the first matching 
`SlotRequestId` and then remap the orphaned `SlotRequestId` to its 
`AllocationID`. The original insertion ordering should not suffer in 
`DualKeyLinkedMap.aMap`. If so, we could remove  `requestedAllocations`.
   
   EDIT: `waitingForResourceManager` -> `pendingRequests`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-05-26 Thread GitBox


azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r430441131



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -112,6 +114,9 @@
/** The requests that are waiting for the resource manager to be 
connected. */
private final LinkedHashMap 
waitingForResourceManager;
 
+   /** Maps a request to its allocation. */
+   private final BiMap requestedAllocations;

Review comment:
   Looking into the implementation of `DualKeyLinkedMap` for 
`waitingForResourceManager`, it seems we can just remove the first matching 
`SlotRequestId` and then remap the orphaned `SlotRequestId` to its 
`AllocationID`. The original insertion ordering should not suffer in 
`DualKeyLinkedMap.aMap`. If so, we could remove  `requestedAllocations`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   I am not sure about all consequences of this change for the existing 
scheduling. I mean that we do not respect SlotRequestId->AllocationID by 
accepting the slot offer. Would it make sense to keep this behaviour 
configurable for now depending on scheduling strategy? Or this complication is 
not needed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org