Repository: samza
Updated Branches:
  refs/heads/master 49dac97cb -> 1c0a60bb7


SAMZA-1021 : Remove the redundent poll waiting inside AsyncRunLoop blockIfBusy


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c0a60bb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c0a60bb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c0a60bb

Branch: refs/heads/master
Commit: 1c0a60bb74cc9188413ed99482e910e35359822d
Parents: 49dac97
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Thu Sep 22 11:42:13 2016 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Thu Sep 22 11:46:51 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/task/AsyncRunLoop.java     | 27 +++++---------------
 1 file changed, 7 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1c0a60bb/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index a510bb0..9a21bf1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -201,32 +201,23 @@ public class AsyncRunLoop implements Runnable {
   }
 
   /**
-   * Block the runloop thread if all tasks are busy. Due to limitation of 
non-blocking for the flow control,
-   * we block the run loop when there are no runnable tasks, or all tasks are 
idle (no pending messages) while
-   * chooser is empty too. When a task worker finishes or window/commit 
completes, it will resume the runloop.
+   * Block the runloop thread if all tasks are busy. When a task worker 
finishes or window/commit completes,
+   * it will resume the runloop.
    */
   private void blockIfBusy(IncomingMessageEnvelope envelope) {
     synchronized (latch) {
       while (!shutdownNow && throwable == null) {
         for (AsyncTaskWorker worker : taskWorkers.values()) {
-          if (worker.state.isReady() && (envelope != null || 
worker.state.hasPendingOps())) {
-            // should continue running since the worker state is ready and 
there is either new message
-            // or some pending operations for the worker
+          if (worker.state.isReady()) {
+            // should continue running if any worker state is ready
+            // consumerMultiplexer will block on polling for empty partitions 
so it won't cause busy loop
             return;
           }
         }
 
         try {
           log.trace("Block loop thread");
-
-          if (envelope == null) {
-            // If the envelope is null then we will wait for a poll interval, 
otherwise next choose() will
-            // return null immediately and we will have a busy loop
-            latch.wait(consumerMultiplexer.pollIntervalMs());
-            return;
-          } else {
-            latch.wait();
-          }
+          latch.wait();
         } catch (InterruptedException e) {
           throw new SamzaException("Run loop is interrupted", e);
         }
@@ -531,10 +522,6 @@ public class AsyncRunLoop implements Runnable {
       }
     }
 
-    private boolean hasPendingOps() {
-      return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow;
-    }
-
     /**
      * Returns the next operation by this taskWorker
      */
@@ -616,4 +603,4 @@ public class AsyncRunLoop implements Runnable {
       return pendingEnvelope.envelope;
     }
   }
-}
\ No newline at end of file
+}

Reply via email to