[ 
https://issues.apache.org/jira/browse/YARN-11191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578409#comment-17578409
 ] 

Yuan Luo commented on YARN-11191:
---------------------------------

 
{code:java}
public class ThreadLockTest {

  private static DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  public static void BeforeFixDeadLock() throws InterruptedException {
    System.out.println(
        "BeforeFixDeadLock test start, this will cause deadlock......");
    ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
    ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();

    ReentrantReadWriteLock premmptionLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock premmptionReadLock =
        premmptionLock.readLock();
    ReentrantReadWriteLock.WriteLock premmptionWriteLock =
        premmptionLock.writeLock();

    Thread schedulerThread = new Thread(() -> {
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread start!");
      //hold: csqueue.readLock
      queueReadLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread get queueReadLock!");
      try {
        Thread.sleep(1000 * 15);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      //require: PremmptionManager.readLock
      premmptionReadLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread get premmptionReadLock!");

      premmptionReadLock.unlock();
      queueReadLock.unlock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread finish!");
    });

    Thread refreshQueueThread = new Thread(() -> {
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread start!");
      //hold: PremmptionManager.writeLock
      premmptionWriteLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread get premmptionWriteLock!");

      try {
        Thread.sleep(1000 * 10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      //require: csqueue.readLock
      queueReadLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread get queueReadLock!");

      queueReadLock.unlock();
      premmptionWriteLock.unlock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread finish!");
    });

    Thread otherThread = new Thread(() -> {
      //make otherThread request queue write lock after schedule thread hold
      // queue write lock, and before refres thread to get queue read lock
      try {
        Thread.sleep(1000 * 5);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println(
          "current time: " + sdf.format(new Date()) + ", otherThread start!");
      queueWriteLock.lock();


      System.out.println("current time: " + sdf.format(new Date()) +
          ", otherThread get queueWriteLock!");
      queueWriteLock.unlock();
      System.out.println(
          "current time: " + sdf.format(new Date()) + ", otherThread finish!");
    });

    schedulerThread.start();
    refreshQueueThread.start();
    otherThread.start();

    refreshQueueThread.join();
    schedulerThread.join();
    otherThread.join();
  }

  public static void AfterFixDeadLock() throws InterruptedException {
    System.out.println("AfterFixDeadLock test start......");
    ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
    ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();

    ReentrantReadWriteLock premmptionLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock premmptionReadLock =
        premmptionLock.readLock();
    ReentrantReadWriteLock.WriteLock premmptionWriteLock =
        premmptionLock.writeLock();

    Thread schedulerThread = new Thread(() -> {
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread start!");
      //hold: csqueue.readLock
      queueReadLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread get queueReadLock!");
      try {
        Thread.sleep(1000 * 15);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      //require: PremmptionManager.readLock
      premmptionReadLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread get premmptionReadLock!");

      premmptionReadLock.unlock();
      queueReadLock.unlock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", schedulerThread finish!");
    });

    Thread refreshQueueThread = new Thread(() -> {
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread start!");
      //hold: PremmptionManager.writeLock
      premmptionWriteLock.lock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread get premmptionWriteLock!");

      //require: csqueue.readLock
      try {
        Thread.sleep(1000 * 10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      while (!queueReadLock.tryLock()) {
        LockSupport.parkNanos(10000);
      }
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread get queueReadLock!");

      queueReadLock.unlock();
      premmptionWriteLock.unlock();
      System.out.println("current time: " + sdf.format(new Date()) +
          ", refreshQueueThread finish!");
    });

    Thread otherThread = new Thread(() -> {
      //make otherThread request queue write lock after schedule thread hold
      // queue write lock, and before refres thread to get queue read lock
      try {
        Thread.sleep(1000 * 5);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println(
          "current time: " + sdf.format(new Date()) + ", otherThread start!");
      queueWriteLock.lock();


      System.out.println("current time: " + sdf.format(new Date()) +
          ", otherThread get queueWriteLock!");
      queueWriteLock.unlock();
      System.out.println(
          "current time: " + sdf.format(new Date()) + ", otherThread finish!");
    });

    schedulerThread.start();
    refreshQueueThread.start();
    otherThread.start();

    refreshQueueThread.join();
    schedulerThread.join();
    otherThread.join();
  }

  public static void main(String[] args) throws InterruptedException {
    AfterFixDeadLock();
    System.out.println("--------------------------------");
    BeforeFixDeadLock();
  }{code}
 

I wrote a Java demo to replicate the problem and then tested that the fix would 
work.

 

> Global Scheduler refreshQueue cause deadLock 
> ---------------------------------------------
>
>                 Key: YARN-11191
>                 URL: https://issues.apache.org/jira/browse/YARN-11191
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: capacity scheduler
>    Affects Versions: 2.9.0, 3.0.0, 3.1.0, 2.10.0, 3.2.0, 3.3.0
>            Reporter: ben yang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 1.jstack, Lock holding status.png, YARN-11191.001.patch
>
>
> This is a potential bug may impact all open premmption  cluster.In our 
> current version with preemption enabled, the capacityScheduler will call the 
> refreshQueue method of the PreemptionManager when it refreshQueue. This 
> process hold the preemptionManager write lock and  require csqueue read 
> lock.Meanwhile,ParentQueue.canAssignToThisQueue will hold csqueue readLock 
> and require PreemptionManager ReadLock.
> There is a possibility of deadlock at this time.Because readlock has one rule 
> on unfair policy, when a lock is already occupied by a read lock and the 
> first request in the lock competition queue is a write lock request,other 
> read lock requests cann‘t acquire the lock.
> So the potential deadlock is:
> {code:java}
> CapacityScheduler.refreshQueue: hold: PremmptionManager.writeLock
>                                 require: csqueue.readLock
> CapacityScheduler.schedule: hold: csqueue.readLock
>                             require: PremmptionManager.readLock
> other thread(completeContainer,release Resource,etc.): require: 
> csqueue.writeLock 
> {code}
> The jstack logs at the time were as follows



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to