Hwanju Kim created FLINK-14589:
----------------------------------

             Summary: Redundant slot requests with the same AllocationID leads 
to inconsistent slot table
                 Key: FLINK-14589
                 URL: https://issues.apache.org/jira/browse/FLINK-14589
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.6.3
            Reporter: Hwanju Kim


_NOTE: We found this issue in 1.6.2, but I checked the relevant code is still 
in the mainline. What I am not sure, however, is what other slot-related fixes 
after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the 
initial cause of this issue from happening. So far, I have not found the 
related fix to the issue I am describing here, so opening this issue. Please 
feel free to deduplicate this if another one already covers it. Please note 
that we have already picked FLINK-9912, which turned out to be a major fix to 
slot allocation failure issue. I will note the ramification to that issue just 
in case others experience the same problem)._
h2. Summary

When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), TM 
firstly reserves the requested slot marking it as ALLOCATED, offers the slot to 
JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This 
three-way communication for slot allocation is identified by AllocationID, 
which is generated by JM initially. The way TM reserves a slot is by calling 
*TaskSlotTable.allocateSlot* if the requested slot number (i.e., slot index) is 
free to use. The major data structure is *TaskSlot* indexed by slot index. Once 
the slot is marked as ALLOCATED with a given AllocationID, it tries to update 
other maps such as *allocationIDTaskSlotMap* keyed by AllocationID and 
*slotsPerJob* keyed by JobID. Here when updating *allocationIDTaskSlotMap*, 
it's directly using *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, 
which may overwrite existing entry, if one is already there with the same 
AllocationID. This would render inconsistency between *TaskSlot* and 
*allocationIDTaskSlotMap*, where the former says two slots are allocated by the 
same AllocationID and the latter says the AllocationID only has the latest task 
slot. With this state, once the slot is freed, *freeSlot* is driven by 
AllocationID, so it fetches slot index (i.e., the latter one that has arrived 
later) from *allocationIDTaskSlotMap*, marks the slot free, and removes it from 
*allocationIDTaskSlotMap*. But still the old task slot is marked as allocated. 
This old task slot becomes zombie and can never be freed. This can cause 
permanent slot allocation failure if TM slots are statically and tightly 
provisioned and resource manager is not actively spawning new TMs where 
unavailable (e.g., Kubernetes without active mode integration, which is not yet 
available).
h2. Scenario

>From my observation, the redundant slot requests with the same AllocationID 
>and different slot indices should be rare but can happen with race condition 
>especially when repeated fail-over and heartbeat timeout (primarily caused by 
>transient resource overload, not permanent network partition/node outage) are 
>taking place. The following is a detailed scenario, which could lead to this 
>issue (AID is AllocationID):
 # AID1 is requested from JM and put in the pending request queue in RM.
 # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot 
with Slot1 and AID1. Here this slot request is on the fly.
 # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request 
and TM sends slot report via heartbeat to RM saying Slot1 is already allocated 
with AID2.
 # RM's heartbeat handler identifies that Slot1 is occupied with a different 
AID (AID2) so that it should reject the pending request sent from step 2.
 # handleFailedSlotRequest puts the rejected AID1 to pending request again by 
retrying the slot request. Now it picks up another available slot, say Slot2. 
So, the retried slot request with Slot 2 and AID1 is on the fly.
 # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with 
JM, or releasing all the tasks in the slot on cancellation/failure - the latter 
was observed).
 # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's 
succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which 
acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, 
allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1.
 # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As 
Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and 
*"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
 # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the 
offer as the same AID is already occupied by another slot.
 # TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of 
that, it removes allocationIDTaskSlotMap[AID1]. *Here Slot1 is still marked as 
ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.*
 # From this point on, RM believes that Slot1 is allocated for AID1, so is JM, 
proceeding task deployment with AID1. In TM, AID1 is not allocated at all due 
to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with 
*TaskSubmissionException("No task slot allocated for job ID")*.
 # Any slot release from JM (by another heartbeat timeout) removes the 
allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where 
freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] = 
null.
 # Any further scheduling is failed with *NoResourceAvailableException("Could 
not allocate all requires slots within timeout")*, unless active resource 
manager is used.

h2. Repro method

The repro I have is a little stressed way than deterministic one, by having 
constantly failing app (also with not reacting to cancellation to prolong 
fail-over), with short heartbeat timeout (<=1s) to emulate resource overload 
without imposing arbitrary resource overloads (I enabled DEBUG log, which could 
also add a bit more load). Apart from repro test, the real applications that 
run into this issue are heavily loaded (high slot oversubscription) doing 
continuous fail-overs (by code error).
h2. Experiment

I've tested the simple solution like having *TaskSlotTable.allocateSlot* check 
if *allocationIDTaskSlotMap* has an existing task slot entry for a requested 
AID, it can reject allocation up front, so that the redundant slot request can 
fail fast not reaching reservation and slot offer, preventing 
allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the 
above example, the fix can replace step 8, 9, and 10 with RM getting 
*SlotAllocationException* on handling failed slot request. It may retry the 
pending request via handleFailedSlotRequest, but it would stop retrying once 
next heartbeat saying the pending slot was allocated by another AID or 
eventually slot request timeout. My test with the fix has run for a while 
hitting this race case multiple times and survived without slot allocation 
failure.

As my experience may be in a bit old version (1.6.2), It would be good to 
discuss what other unexpected things happened and what would be other related 
issues that have been incorporated in mainline.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to