[
https://issues.apache.org/jira/browse/YARN-11191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578409#comment-17578409
]
Yuan Luo commented on YARN-11191:
---------------------------------
{code:java}
public class ThreadLockTest {
private static DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void BeforeFixDeadLock() throws InterruptedException {
System.out.println(
"BeforeFixDeadLock test start, this will cause deadlock......");
ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();
ReentrantReadWriteLock premmptionLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock premmptionReadLock =
premmptionLock.readLock();
ReentrantReadWriteLock.WriteLock premmptionWriteLock =
premmptionLock.writeLock();
Thread schedulerThread = new Thread(() -> {
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread start!");
//hold: csqueue.readLock
queueReadLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread get queueReadLock!");
try {
Thread.sleep(1000 * 15);
} catch (InterruptedException e) {
e.printStackTrace();
}
//require: PremmptionManager.readLock
premmptionReadLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread get premmptionReadLock!");
premmptionReadLock.unlock();
queueReadLock.unlock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread finish!");
});
Thread refreshQueueThread = new Thread(() -> {
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread start!");
//hold: PremmptionManager.writeLock
premmptionWriteLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread get premmptionWriteLock!");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//require: csqueue.readLock
queueReadLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread get queueReadLock!");
queueReadLock.unlock();
premmptionWriteLock.unlock();
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread finish!");
});
Thread otherThread = new Thread(() -> {
//make otherThread request queue write lock after schedule thread hold
// queue write lock, and before refres thread to get queue read lock
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"current time: " + sdf.format(new Date()) + ", otherThread start!");
queueWriteLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", otherThread get queueWriteLock!");
queueWriteLock.unlock();
System.out.println(
"current time: " + sdf.format(new Date()) + ", otherThread finish!");
});
schedulerThread.start();
refreshQueueThread.start();
otherThread.start();
refreshQueueThread.join();
schedulerThread.join();
otherThread.join();
}
public static void AfterFixDeadLock() throws InterruptedException {
System.out.println("AfterFixDeadLock test start......");
ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();
ReentrantReadWriteLock premmptionLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock premmptionReadLock =
premmptionLock.readLock();
ReentrantReadWriteLock.WriteLock premmptionWriteLock =
premmptionLock.writeLock();
Thread schedulerThread = new Thread(() -> {
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread start!");
//hold: csqueue.readLock
queueReadLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread get queueReadLock!");
try {
Thread.sleep(1000 * 15);
} catch (InterruptedException e) {
e.printStackTrace();
}
//require: PremmptionManager.readLock
premmptionReadLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread get premmptionReadLock!");
premmptionReadLock.unlock();
queueReadLock.unlock();
System.out.println("current time: " + sdf.format(new Date()) +
", schedulerThread finish!");
});
Thread refreshQueueThread = new Thread(() -> {
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread start!");
//hold: PremmptionManager.writeLock
premmptionWriteLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread get premmptionWriteLock!");
//require: csqueue.readLock
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (!queueReadLock.tryLock()) {
LockSupport.parkNanos(10000);
}
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread get queueReadLock!");
queueReadLock.unlock();
premmptionWriteLock.unlock();
System.out.println("current time: " + sdf.format(new Date()) +
", refreshQueueThread finish!");
});
Thread otherThread = new Thread(() -> {
//make otherThread request queue write lock after schedule thread hold
// queue write lock, and before refres thread to get queue read lock
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"current time: " + sdf.format(new Date()) + ", otherThread start!");
queueWriteLock.lock();
System.out.println("current time: " + sdf.format(new Date()) +
", otherThread get queueWriteLock!");
queueWriteLock.unlock();
System.out.println(
"current time: " + sdf.format(new Date()) + ", otherThread finish!");
});
schedulerThread.start();
refreshQueueThread.start();
otherThread.start();
refreshQueueThread.join();
schedulerThread.join();
otherThread.join();
}
public static void main(String[] args) throws InterruptedException {
AfterFixDeadLock();
System.out.println("--------------------------------");
BeforeFixDeadLock();
}{code}
I wrote a Java demo to replicate the problem and then tested that the fix would
work.
> Global Scheduler refreshQueue cause deadLock
> ---------------------------------------------
>
> Key: YARN-11191
> URL: https://issues.apache.org/jira/browse/YARN-11191
> Project: Hadoop YARN
> Issue Type: Bug
> Components: capacity scheduler
> Affects Versions: 2.9.0, 3.0.0, 3.1.0, 2.10.0, 3.2.0, 3.3.0
> Reporter: ben yang
> Priority: Major
> Labels: pull-request-available
> Attachments: 1.jstack, Lock holding status.png, YARN-11191.001.patch
>
>
> This is a potential bug may impact all open premmption cluster.In our
> current version with preemption enabled, the capacityScheduler will call the
> refreshQueue method of the PreemptionManager when it refreshQueue. This
> process hold the preemptionManager write lock and require csqueue read
> lock.Meanwhile,ParentQueue.canAssignToThisQueue will hold csqueue readLock
> and require PreemptionManager ReadLock.
> There is a possibility of deadlock at this time.Because readlock has one rule
> on unfair policy, when a lock is already occupied by a read lock and the
> first request in the lock competition queue is a write lock request,other
> read lock requests cann‘t acquire the lock.
> So the potential deadlock is:
> {code:java}
> CapacityScheduler.refreshQueue: hold: PremmptionManager.writeLock
> require: csqueue.readLock
> CapacityScheduler.schedule: hold: csqueue.readLock
> require: PremmptionManager.readLock
> other thread(completeContainer,release Resource,etc.): require:
> csqueue.writeLock
> {code}
> The jstack logs at the time were as follows
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]