Repository: tez
Updated Branches:
  refs/heads/master a1f2da8eb -> 022df7218


TEZ-3893. Tez Local Mode can hang for cases. (Jonathan Eagles via jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/022df721
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/022df721
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/022df721

Branch: refs/heads/master
Commit: 022df7218afbb2c940ddc4447246dea5a546c759
Parents: a1f2da8
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Feb 14 10:14:34 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Feb 14 10:14:34 2018 -0600

----------------------------------------------------------------------
 .../dag/app/rm/LocalTaskSchedulerService.java   | 87 ++++++++++----------
 .../tez/dag/app/rm/TestLocalTaskScheduler.java  | 13 +--
 .../app/rm/TestLocalTaskSchedulerService.java   | 35 ++++----
 3 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 3b034cd..04e79a8 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -19,8 +19,8 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -51,7 +51,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalTaskSchedulerService.class);
 
   final ContainerSignatureMatcher containerSignatureMatcher;
-  final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+  final LinkedBlockingQueue<TaskRequest> taskRequestQueue;
   final Configuration conf;
   AsyncDelegateRequestHandler taskRequestHandler;
   Thread asyncDelegateRequestThread;
@@ -62,7 +62,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
 
   public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
     super(taskSchedulerContext);
-    taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
+    taskRequestQueue = new LinkedBlockingQueue<>();
     taskAllocations = new LinkedHashMap<Object, Container>();
     this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
     this.containerSignatureMatcher = 
taskSchedulerContext.getContainerSignatureMatcher();
@@ -313,29 +313,31 @@ public class LocalTaskSchedulerService extends 
TaskScheduler {
   }
 
   static class AsyncDelegateRequestHandler implements Runnable {
-    final BlockingQueue<TaskRequest> taskRequestQueue;
+    final LinkedBlockingQueue<TaskRequest> clientRequestQueue;
+    final PriorityBlockingQueue<AllocateTaskRequest> taskRequestQueue;
     final LocalContainerFactory localContainerFactory;
     final HashMap<Object, Container> taskAllocations;
     final TaskSchedulerContext taskSchedulerContext;
     final int MAX_TASKS;
 
-    AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
+    AsyncDelegateRequestHandler(LinkedBlockingQueue<TaskRequest> 
clientRequestQueue,
         LocalContainerFactory localContainerFactory,
         HashMap<Object, Container> taskAllocations,
         TaskSchedulerContext taskSchedulerContext,
         Configuration conf) {
-      this.taskRequestQueue = taskRequestQueue;
+      this.clientRequestQueue = clientRequestQueue;
       this.localContainerFactory = localContainerFactory;
       this.taskAllocations = taskAllocations;
       this.taskSchedulerContext = taskSchedulerContext;
       this.MAX_TASKS = 
conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
           TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+      this.taskRequestQueue = new PriorityBlockingQueue<>();
     }
 
     public void addAllocateTaskRequest(Object task, Resource capability, 
Priority priority,
         Object clientCookie) {
       try {
-        taskRequestQueue.put(new AllocateTaskRequest(task, capability, 
priority, clientCookie));
+        clientRequestQueue.put(new AllocateTaskRequest(task, capability, 
priority, clientCookie));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -343,57 +345,54 @@ public class LocalTaskSchedulerService extends 
TaskScheduler {
 
     public boolean addDeallocateTaskRequest(Object task) {
       try {
-        taskRequestQueue.put(new DeallocateTaskRequest(task));
+        clientRequestQueue.put(new DeallocateTaskRequest(task));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      synchronized(taskRequestQueue) {
-        taskRequestQueue.notify();
-      }
       return true;
     }
 
-    boolean shouldWait() {
-      return taskAllocations.size() >= MAX_TASKS;
+    boolean shouldProcess() {
+      return !taskRequestQueue.isEmpty() && taskAllocations.size() < MAX_TASKS;
     }
 
     @Override
     public void run() {
-      while(!Thread.currentThread().isInterrupted()) {
-        synchronized(taskRequestQueue) {
-          try {
-            if (shouldWait()) {
-              taskRequestQueue.wait();
-            }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
+      while (!Thread.currentThread().isInterrupted()) {
+        dispatchRequest();
+        while (shouldProcess()) {
+          allocateTask();
         }
-        processRequest();
       }
     }
 
-    void processRequest() {
-        try {
-          TaskRequest request = taskRequestQueue.take();
-          if (request instanceof AllocateTaskRequest) {
-            allocateTask((AllocateTaskRequest)request);
-          }
-          else if (request instanceof DeallocateTaskRequest) {
-            deallocateTask((DeallocateTaskRequest)request);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        } catch (NullPointerException e) {
-          LOG.warn("Task request was badly constructed");
+    void dispatchRequest() {
+      try {
+        TaskRequest request = clientRequestQueue.take();
+        if (request instanceof AllocateTaskRequest) {
+          taskRequestQueue.put((AllocateTaskRequest)request);
+        }
+        else if (request instanceof DeallocateTaskRequest) {
+          deallocateTask((DeallocateTaskRequest)request);
+        }
+        else {
+          LOG.error("Unknown task request message: " + request);
         }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     }
 
-    void allocateTask(AllocateTaskRequest request) {
-      Container container = 
localContainerFactory.createContainer(request.capability,
-          request.priority);
-      taskAllocations.put(request.task, container);
-      taskSchedulerContext.taskAllocated(request.task, request.clientCookie, 
container);
+    void allocateTask() {
+      try {
+        AllocateTaskRequest request = taskRequestQueue.take();
+        Container container = 
localContainerFactory.createContainer(request.capability,
+            request.priority);
+        taskAllocations.put(request.task, container);
+        taskSchedulerContext.taskAllocated(request.task, request.clientCookie, 
container);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     }
 
     void deallocateTask(DeallocateTaskRequest request) {
@@ -403,13 +402,13 @@ public class LocalTaskSchedulerService extends 
TaskScheduler {
       }
       else {
         boolean deallocationBeforeAllocation = false;
-        Iterator<TaskRequest> iter = taskRequestQueue.iterator();
+        Iterator<AllocateTaskRequest> iter = taskRequestQueue.iterator();
         while (iter.hasNext()) {
           TaskRequest taskRequest = iter.next();
-          if (taskRequest instanceof AllocateTaskRequest && 
taskRequest.task.equals(request.task)) {
+          if (taskRequest.task.equals(request.task)) {
             iter.remove();
             deallocationBeforeAllocation = true;
-            LOG.info("deallcation happen before allocation for task:" + 
request.task);
+            LOG.info("Deallocation request before allocation for task:" + 
request.task);
             break;
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 2ada2f1..36505c2 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -57,11 +57,11 @@ public class TestLocalTaskScheduler {
     LocalContainerFactory containerFactory = new 
LocalContainerFactory(appAttemptId, 1000);
 
     HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, 
Container>();
-    PriorityBlockingQueue<TaskRequest> taskRequestQueue = new 
PriorityBlockingQueue<TaskRequest>();
+    LinkedBlockingQueue<TaskRequest> clientRequestQueue = new 
LinkedBlockingQueue<>();
 
     // Object under test
     AsyncDelegateRequestHandler requestHandler =
-      new AsyncDelegateRequestHandler(taskRequestQueue,
+      new AsyncDelegateRequestHandler(clientRequestQueue,
           containerFactory,
           taskAllocations,
           mockContext,
@@ -71,17 +71,18 @@ public class TestLocalTaskScheduler {
     for (int i = 0; i < MAX_TASKS; i++) {
       Priority priority = Priority.newInstance(20);
       requestHandler.addAllocateTaskRequest(new Long(i), null, priority, null);
-      requestHandler.processRequest();
+      requestHandler.dispatchRequest();
+      requestHandler.allocateTask();
     }
 
     // Only MAX_TASKS number of tasks should have been allocated
     Assert.assertEquals("Wrong number of allocate tasks", MAX_TASKS, 
taskAllocations.size());
-    Assert.assertTrue("Another allocation should not fit", 
requestHandler.shouldWait());
+    Assert.assertTrue("Another allocation should not fit", 
!requestHandler.shouldProcess());
 
     // Deallocate down to zero
     for (int i = 0; i < MAX_TASKS; i++) {
       requestHandler.addDeallocateTaskRequest(new Long(i));
-      requestHandler.processRequest();
+      requestHandler.dispatchRequest();
     }
 
     // All allocated tasks should have been removed

http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 3b2de34..c2daf84 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -19,7 +19,7 @@
 package org.apache.tez.dag.app.rm;
 
 import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,6 +91,9 @@ public class TestLocalTaskSchedulerService {
     taskSchedulerService.initialize();
     taskSchedulerService.start();
 
+    // create a task that fills the task allocation queue
+    Task dummy_task = mock(Task.class);
+    taskSchedulerService.allocateTask(dummy_task, Resource.newInstance(1024, 
1), null, null, Priority.newInstance(1), null, null);
     Task task = mock(Task.class);
     taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), 
null, null, Priority.newInstance(1), null, null);
     taskSchedulerService.deallocateTask(task, false, null, null);
@@ -98,10 +101,10 @@ public class TestLocalTaskSchedulerService {
     taskSchedulerService.startRequestHandlerThread();
 
     MockAsyncDelegateRequestHandler requestHandler = 
taskSchedulerService.getRequestHandler();
-    requestHandler.drainRequest(1);
+    requestHandler.drainRequest(3);
     assertEquals(1, requestHandler.deallocateCount);
     // The corresponding AllocateTaskRequest will be removed, so won't been 
processed.
-    assertEquals(0, requestHandler.allocateCount);
+    assertEquals(1, requestHandler.allocateCount);
     taskSchedulerService.shutdown();
   }
 
@@ -170,10 +173,10 @@ public class TestLocalTaskSchedulerService {
 
       public int allocateCount = 0;
       public int deallocateCount = 0;
-      public int processedCount =0;
+      public int dispatchCount = 0;
 
       MockAsyncDelegateRequestHandler(
-          BlockingQueue<TaskRequest> taskRequestQueue,
+          LinkedBlockingQueue<TaskRequest> taskRequestQueue,
           LocalContainerFactory localContainerFactory,
           HashMap<Object, Container> taskAllocations,
           TaskSchedulerContext appClientDelegate, Configuration conf) {
@@ -182,13 +185,19 @@ public class TestLocalTaskSchedulerService {
       }
 
       @Override
-      void processRequest() {
-        super.processRequest();
-        processedCount ++;
+      void dispatchRequest() {
+        super.dispatchRequest();
+        dispatchCount++;
+      }
+
+      @Override
+      void allocateTask() {
+        super.allocateTask();
+        allocateCount++;
       }
 
       public void drainRequest(int count) {
-        while(processedCount != count || !taskRequestQueue.isEmpty()) {
+        while(dispatchCount != count || !clientRequestQueue.isEmpty()) {
           try {
             Thread.sleep(100);
           } catch (InterruptedException e) {
@@ -198,15 +207,9 @@ public class TestLocalTaskSchedulerService {
       }
 
       @Override
-      void allocateTask(AllocateTaskRequest request) {
-        super.allocateTask(request);
-        allocateCount ++;
-      }
-
-      @Override
       void deallocateTask(DeallocateTaskRequest request) {
         super.deallocateTask(request);
-        deallocateCount ++;
+        deallocateCount++;
       }
     }
   }

Reply via email to