Repository: samza
Updated Branches:
  refs/heads/master 7ae261afa -> 9126d373d


SAMZA-974 Support finite data sources that have a notion of end of stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9126d373
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9126d373
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9126d373

Branch: refs/heads/master
Commit: 9126d373d083787e9489f03c61ae59657705cfdc
Parents: 7ae261a
Author: vjagadish1989 <jvenk...@linkedin.com>
Authored: Fri Sep 30 23:07:30 2016 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Fri Sep 30 23:07:30 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   1 +
 .../samza/system/IncomingMessageEnvelope.java   |  24 ++-
 .../system/SystemStreamPartitionIterator.java   |  15 +-
 .../samza/task/EndOfStreamListenerTask.java     |  43 ++++
 .../clustermanager/ResourceRequestState.java    |  12 +-
 .../org/apache/samza/task/AsyncRunLoop.java     |  83 +++++++-
 .../samza/task/AsyncStreamTaskAdapter.java      |   9 +-
 .../apache/samza/container/TaskInstance.scala   |  12 +-
 .../apache/samza/system/SystemConsumers.scala   |  35 +++-
 .../TestContainerProcessManager.java            |   8 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java | 200 ++++++++++++++++++-
 .../samza/system/TestSystemConsumers.scala      |   6 +
 13 files changed, 407 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ab257d3..2bea27b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -120,6 +120,7 @@ project(':samza-api') {
   apply plugin: 'checkstyle'
 
   dependencies {
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7e77702..db6f859 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -33,6 +33,7 @@
     <allow pkg="org.apache.commons" />
     <allow class="scala.collection.JavaConversions" />
     <allow class="scala.collection.JavaConverters" />
+    <allow class="scala.Option" />
     <allow pkg="scala.runtime" />
 
     <subpackage name="config">

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index cc860cf..0ced773 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -19,11 +19,18 @@
 
 package org.apache.samza.system;
 
+import java.nio.charset.Charset;
+
 /**
- * This class represents a message envelope that is received by a StreamTask 
for each message that is received from a
+ * This class represents a message entvelope that is received by a StreamTask 
for each message that is received from a
  * partition of a specific input stream.
  */
 public class IncomingMessageEnvelope {
+
+  //The offset starting with a NUL byte encoded are reserved for end-of-stream.
+  private static final byte[] END_OF_STREAM_BYTES = 
"\0END_OF_STREAM".getBytes();
+  public static final String END_OF_STREAM_OFFSET = new 
String(END_OF_STREAM_BYTES, Charset.defaultCharset());
+
   private final SystemStreamPartition systemStreamPartition;
   private final String offset;
   private final Object key;
@@ -79,6 +86,21 @@ public class IncomingMessageEnvelope {
     return size;
   }
 
+  public boolean isEndOfStream() {
+    return END_OF_STREAM_OFFSET.equals(offset);
+  }
+
+  /**
+   * Builds an end-of-stream envelope for an SSP. This is used by a {@link 
SystemConsumer} implementation to
+   * indicate that it is at end-of-stream. The end-of-stream message should 
not delivered to the task implementation.
+   *
+   * @param ssp The SSP that is at end-of-stream.
+   * @return an IncomingMessageEnvelope corresponding to end-of-stream for 
that SSP.
+   */
+  public static IncomingMessageEnvelope 
buildEndOfStreamEnvelope(SystemStreamPartition ssp) {
+    return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, null);
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
index a8f858a..d1d61ed 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.system;
 
+import org.apache.samza.SamzaException;
+
 import java.util.ArrayDeque;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.samza.SamzaException;
-
 /**
  * {@link java.util.Iterator} that wraps a
  * {@link org.apache.samza.system.SystemConsumer} to iterate over the messages
@@ -40,6 +40,7 @@ public class SystemStreamPartitionIterator implements 
Iterator<IncomingMessageEn
   private final SystemConsumer systemConsumer;
   private final Set<SystemStreamPartition> fetchSet;
   private Queue<IncomingMessageEnvelope> peeks;
+  private boolean endOfStreamReached = false;
 
   public SystemStreamPartitionIterator(SystemConsumer systemConsumer, 
SystemStreamPartition systemStreamPartition) {
     this(systemConsumer, systemStreamPartition, 1000);
@@ -67,7 +68,13 @@ public class SystemStreamPartitionIterator implements 
Iterator<IncomingMessageEn
       throw new NoSuchElementException();
     }
 
-    return peeks.poll();
+    IncomingMessageEnvelope envelope = peeks.poll();
+
+    if (envelope.isEndOfStream()) {
+      endOfStreamReached = true;
+    }
+
+    return envelope;
   }
 
   @Override
@@ -75,7 +82,7 @@ public class SystemStreamPartitionIterator implements 
Iterator<IncomingMessageEn
   }
 
   private void refresh() {
-    if (peeks.size() == 0) {
+    if (peeks.size() == 0 && !endOfStreamReached) {
       try {
         Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = 
systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
b/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java
new file mode 100644
index 0000000..0f62356
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ *
+ * The EndOfStreamListenerTask augments {@link StreamTask} allowing the method 
implementor to specify code to be
+ * executed when the 'end-of-stream' is reached for all input SSPs.
+ *
+ * While some streaming sources are infinite (like kafka), some others like 
HDFS, File based sources are bounded. For instance,
+ * file based sources have the notion of EOF to indicate that there is no more 
data.
+ *
+ */
+public interface EndOfStreamListenerTask {
+
+  /**
+   * Guaranteed to be invoked when all SSPs processed by this task have 
reached their end-of-stream. Users can choose
+   * to invoke commit on the {@link TaskCoordinator} to commit changes.
+   *
+   * @param collector Contains the means of sending message envelopes to an 
output stream.*
+   * @param coordinator Manages execution of tasks.
+   *
+   * @throws Exception Any exception types encountered during the execution of 
the processing task.
+   */
+  void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 39897c7..77c192e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -133,15 +133,9 @@ public class ResourceRequestState {
                * requestCount != 0, it will be greater than the total request 
count for that host. Hence, it should be
                * assigned to ANY_HOST
                */
-              log.info(
-                  "The number of containers already allocated on {} is greater 
than what was " +
-                      "requested, which is {}. Hence, saving the samzaResource 
{} in the buffer for ANY_HOST",
-                  new Object[]{
-                    hostName,
-                    requestCountOnThisHost,
-                    samzaResource.getResourceID()
-                  }
-              );
+              log.info("The number of containers already allocated on {} is 
greater than what was " +
+                              "requested, which is {}. Hence, saving the 
samzaResource {} in the buffer for ANY_HOST",
+                      new Object[]{hostName, requestCountOnThisHost, 
samzaResource.getResourceID()});
               addToAllocatedResourceList(ANY_HOST, samzaResource);
             }
           }

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 9a21bf1..77eceea 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -23,6 +23,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +32,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
@@ -53,6 +56,7 @@ public class AsyncRunLoop implements Runnable {
   private final Map<TaskName, AsyncTaskWorker> taskWorkers;
   private final SystemConsumers consumerMultiplexer;
   private final Map<SystemStreamPartition, List<AsyncTaskWorker>> 
sspToTaskWorkerMapping;
+
   private final ExecutorService threadPool;
   private final CoordinatorRequests coordinatorRequests;
   private final Object latch;
@@ -136,7 +140,6 @@ public class AsyncRunLoop implements Runnable {
         long startNs = System.nanoTime();
 
         IncomingMessageEnvelope envelope = chooseEnvelope();
-
         long chooseNs = System.nanoTime();
 
         containerMetrics.chooseNs().update(chooseNs - startNs);
@@ -200,6 +203,7 @@ public class AsyncRunLoop implements Runnable {
     }
   }
 
+
   /**
    * Block the runloop thread if all tasks are busy. When a task worker 
finishes or window/commit completes,
    * it will resume the runloop.
@@ -273,6 +277,7 @@ public class AsyncRunLoop implements Runnable {
     WINDOW,
     COMMIT,
     PROCESS,
+    END_OF_STREAM,
     NO_OP
   }
 
@@ -284,11 +289,14 @@ public class AsyncRunLoop implements Runnable {
     private final TaskInstance<AsyncStreamTask> task;
     private final TaskCallbackManager callbackManager;
     private volatile AsyncTaskState state;
+    private volatile boolean completed = false;
+
 
     AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
       this.task = task;
       this.callbackManager = new TaskCallbackManager(this, task.metrics(), 
callbackTimer, callbackTimeoutMs);
-      this.state = new AsyncTaskState(task.taskName(), task.metrics());
+      Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
+      this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet);
     }
 
     private void init() {
@@ -317,6 +325,20 @@ public class AsyncRunLoop implements Runnable {
     }
 
     /**
+     * Returns those partitions for the task for which we have not received 
end-of-stream from the consumer.
+     * @param task
+     * @return a Set of SSPs such that all SSPs are not at end of stream.
+     */
+    private Set<SystemStreamPartition> 
getWorkingSSPSet(TaskInstance<AsyncStreamTask> task) {
+
+      Set<SystemStreamPartition> allPartitions = new 
HashSet<>(JavaConversions.asJavaSet(task.systemStreamPartitions()));
+
+      // filter only those SSPs that are not at end of stream.
+      Set<SystemStreamPartition> workingSSPSet = 
allPartitions.stream().filter(ssp -> 
!consumerMultiplexer.isEndOfStream(ssp)).collect(Collectors.toSet());
+      return workingSSPSet;
+    }
+
+    /**
      * Invoke next task operation based on its state
      */
     private void run() {
@@ -330,12 +352,36 @@ public class AsyncRunLoop implements Runnable {
         case COMMIT:
           commit();
           break;
+        case END_OF_STREAM:
+          endOfStream();
+          break;
         default:
           //no op
           break;
       }
     }
 
+    private void endOfStream() {
+      state.complete = true;
+      try {
+        ReadableCoordinator coordinator = new 
ReadableCoordinator(task.taskName());
+
+        task.endOfStream(coordinator);
+        // issue a request for shutdown of the task
+        coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+        coordinatorRequests.update(coordinator);
+
+        // invoke commit on the task - if the endOfStream callback had 
requested a final commit.
+        boolean needFinalCommit = 
coordinatorRequests.commitRequests().remove(task.taskName());
+        if (needFinalCommit) {
+          task.commit();
+        }
+      } finally {
+        resume();
+      }
+
+    }
+
     /**
      * Process asynchronously. The callback needs to be fired once the 
processing is done.
      */
@@ -428,8 +474,6 @@ public class AsyncRunLoop implements Runnable {
       }
     }
 
-
-
     /**
      * Task process completes successfully, update the offsets based on the 
high-water mark.
      * Then it will trigger the listener for task state change.
@@ -494,24 +538,46 @@ public class AsyncRunLoop implements Runnable {
   private final class AsyncTaskState {
     private volatile boolean needWindow = false;
     private volatile boolean needCommit = false;
+    private volatile boolean complete = false;
+    private volatile boolean endOfStream = false;
     private volatile boolean windowOrCommitInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue;
+    //Set of SSPs that we are currently processing for this task instance
+    private final Set<SystemStreamPartition> processingSspSet;
     private final TaskName taskName;
     private final TaskInstanceMetrics taskMetrics;
 
-    AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics) {
+    AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, 
Set<SystemStreamPartition> sspSet) {
       this.taskName = taskName;
       this.taskMetrics = taskMetrics;
       this.pendingEnvelopQueue = new ArrayDeque<>();
+      this.processingSspSet = sspSet;
+    }
+
+
+    private boolean checkEndOfStream() {
+      PendingEnvelope pendingEnvelope = pendingEnvelopQueue.peek();
+
+      if (pendingEnvelope != null) {
+        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+
+        if (envelope.isEndOfStream()) {
+          SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+          processingSspSet.remove(ssp);
+          pendingEnvelopQueue.remove();
+        }
+      }
+      return processingSspSet.isEmpty();
     }
 
     /**
      * Returns whether the task is ready to do process/window/commit.
      */
     private boolean isReady() {
+      endOfStream |= checkEndOfStream();
       needCommit |= coordinatorRequests.commitRequests().remove(taskName);
-      if (needWindow || needCommit) {
+      if (needWindow || needCommit || endOfStream) {
         // ready for window or commit only when no messages are in progress and
         // no window/commit in flight
         return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
@@ -526,9 +592,13 @@ public class AsyncRunLoop implements Runnable {
      * Returns the next operation by this taskWorker
      */
     private WorkerOp nextOp() {
+
+      if (complete) return WorkerOp.NO_OP;
+
       if (isReady()) {
         if (needCommit) return WorkerOp.COMMIT;
         else if (needWindow) return WorkerOp.WINDOW;
+        else if (endOfStream) return WorkerOp.END_OF_STREAM;
         else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS;
       }
       return WorkerOp.NO_OP;
@@ -577,6 +647,7 @@ public class AsyncRunLoop implements Runnable {
       log.debug("Task {} pending envelope count is {} after insertion.", 
taskName, queueSize);
     }
 
+
     /**
      * Fetch the pending envelope in the pending queue for the task to process.
      * Update the chooser for flow control on the SSP level. Once it's 
updated, the AsyncRunLoop

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index 1fc6456..e2fea95 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -30,7 +30,7 @@ import org.apache.samza.system.IncomingMessageEnvelope;
  * the callbacks once it's done. If the thread pool is null, it follows the 
legacy
  * synchronous model to execute the tasks on the run loop thread.
  */
-public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, 
WindowableTask, ClosableTask {
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, 
WindowableTask, ClosableTask, EndOfStreamListenerTask {
   private final StreamTask wrappedTask;
   private final ExecutorService executor;
 
@@ -89,4 +89,11 @@ public class AsyncStreamTaskAdapter implements 
AsyncStreamTask, InitableTask, Wi
       ((ClosableTask) wrappedTask).close();
     }
   }
+
+  @Override
+  public void onEndOfStream(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception {
+    if (wrappedTask instanceof EndOfStreamListenerTask) {
+      ((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, 
coordinator);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 89f6857..b068856 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -31,6 +31,7 @@ import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.ClosableTask
+import org.apache.samza.task.EndOfStreamListenerTask
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
@@ -58,6 +59,7 @@ class TaskInstance[T](
   val exceptionHandler: TaskInstanceExceptionHandler = new 
TaskInstanceExceptionHandler) extends Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
+  val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
@@ -168,6 +170,14 @@ class TaskInstance[T](
     }
   }
 
+  def endOfStream(coordinator: ReadableCoordinator): Unit = {
+    if (isEndOfStreamListenerTask) {
+      exceptionHandler.maybeHandle {
+        task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, 
coordinator);
+      }
+    }
+  }
+
   def window(coordinator: ReadableCoordinator) {
     if (isWindowableTask) {
       trace("Windowing for taskName: %s" format taskName)
@@ -220,7 +230,7 @@ class TaskInstance[T](
 
   override def toString() = "TaskInstance for class %s and taskName %s." 
format (task.getClass.getName, taskName)
 
-  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s]" format (taskName, isWindowableTask, isClosableTask)
+  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s endofstreamlistener=%s]" format (taskName, isWindowableTask, 
isClosableTask, isEndOfStreamListenerTask)
 
   /**
    * From the envelope, check if this SSP has catched up with the starting 
offset of the SSP

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index a8355b9..e2aed5b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConversions._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtils}
-import org.apache.samza.system.chooser.MessageChooser
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooser}
 import org.apache.samza.SamzaException
-import java.util.HashMap
 import java.util.ArrayDeque
+import java.util.HashSet
+import java.util.HashMap
 import java.util.Queue
 import java.util.Set
-import java.util.HashSet
 
 object SystemConsumers {
   val DEFAULT_POLL_INTERVAL_MS = 50
@@ -130,6 +130,11 @@ class SystemConsumers (
   private val unprocessedMessagesBySSP = new HashMap[SystemStreamPartition, 
Queue[IncomingMessageEnvelope]]()
 
   /**
+   * Set of SSPs that are currently at end-of-stream.
+   */
+  private val endOfStreamSSPs = new HashSet[SystemStreamPartition]()
+
+  /**
    * A set of SystemStreamPartitions grouped by systemName. This is used as a
    * cache to figure out which SystemStreamPartitions we need to poll from the
    * underlying system consumer.
@@ -163,7 +168,6 @@ class SystemConsumers (
 
   def start {
     debug("Starting consumers.")
-
     emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP
       .keySet
       .groupBy(_.getSystem)
@@ -190,8 +194,16 @@ class SystemConsumers (
     chooser.stop
   }
 
+
   def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
+
+    if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
+      info("Stream : %s is already at end of stream" format 
(systemStreamPartition))
+      endOfStreamSSPs.add(systemStreamPartition)
+      return;
+    }
+
     metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new 
ArrayDeque[IncomingMessageEnvelope]())
     chooser.register(systemStreamPartition, offset)
@@ -203,6 +215,11 @@ class SystemConsumers (
     }
   }
 
+
+  def isEndOfStream(systemStreamPartition: SystemStreamPartition) = {
+    endOfStreamSSPs.contains(systemStreamPartition)
+  }
+
   def choose (updateChooser: Boolean = true): IncomingMessageEnvelope = {
     val envelopeFromChooser = chooser.choose
 
@@ -217,6 +234,11 @@ class SystemConsumers (
       } else {
         val systemStreamPartition = 
envelopeFromChooser.getSystemStreamPartition
 
+        if (envelopeFromChooser.isEndOfStream) {
+          info("End of stream reached for partition: %s" format 
systemStreamPartition)
+          endOfStreamSSPs.add(systemStreamPartition)
+        }
+
         trace("Chooser returned an incoming message envelope: %s" format 
envelopeFromChooser)
 
         // Ok to give the chooser a new message from this stream.
@@ -254,7 +276,8 @@ class SystemConsumers (
     val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName)
 
     // Poll when at least one SSP in this system needs more messages.
-    if (systemFetchSet.size > 0) {
+
+    if (systemFetchSet != null && systemFetchSet.size > 0) {
       val consumer = consumers(systemName)
 
       trace("Fetching: %s" format systemFetchSet)
@@ -262,7 +285,6 @@ class SystemConsumers (
       
metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size)
 
       val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, 
timeout)
-
       trace("Got incoming message envelopes: %s" format 
systemStreamPartitionEnvelopes)
 
       metrics.systemMessagesPerPoll(systemName).inc
@@ -307,7 +329,6 @@ class SystemConsumers (
 
     // Update last poll time so we don't poll too frequently.
     lastPollNs = clock()
-
     // Poll every system for new messages.
     consumers.keys.map(poll(_))
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 57a5da6..0d61814 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -393,10 +393,10 @@ public class TestContainerProcessManager {
   @Test
   public void testAppMasterWithFwk() {
     ContainerProcessManager taskManager = new ContainerProcessManager(
-      new MapConfig(config),
-      state,
-      new MetricsRegistryMap(),
-      manager
+        new MapConfig(config),
+        state,
+        new MetricsRegistryMap(),
+        manager
     );
     taskManager.start();
     SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index ca913de..3263e54 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -19,17 +19,23 @@
 
 package org.apache.samza.task;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
@@ -38,13 +44,19 @@ import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.TestSystemConsumers;
+
 import org.junit.Before;
 import org.junit.Test;
+import scala.Option;
 import scala.collection.JavaConversions;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -73,6 +85,8 @@ public class TestAsyncRunLoop {
   IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp0, "0", 
"key0", "value0");
   IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", 
"key1", "value1");
   IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp0, "1", 
"key0", "value0");
+  IncomingMessageEnvelope ssp0EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0);
+  IncomingMessageEnvelope ssp1EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
 
   TestTask task0;
   TestTask task1;
@@ -90,14 +104,19 @@ public class TestAsyncRunLoop {
         containerMetrics);
   }
 
-  TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, 
TaskName taskName, SystemStreamPartition ssp) {
+  TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, 
TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, 
SystemConsumers consumers) {
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", 
new MetricsRegistryMap());
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = 
JavaConversions.asScalaSet(Collections.singleton(ssp)).toSet();
     return new TaskInstance<AsyncStreamTask>(task, taskName, 
mock(Config.class), taskInstanceMetrics,
-        null, consumerMultiplexer, mock(TaskInstanceCollector.class), 
mock(SamzaContainerContext.class),
-        offsetManager, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()));
+        null, consumers, mock(TaskInstanceCollector.class), 
mock(SamzaContainerContext.class),
+        manager, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()));
   }
 
+  TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, 
TaskName taskName, SystemStreamPartition ssp) {
+    return createTaskInstance(task, taskName, ssp, offsetManager, 
consumerMultiplexer);
+  }
+
+
   ExecutorService callbackExecutor;
   void triggerCallback(final TestTask task, final TaskCallback callback, final 
boolean success) {
     callbackExecutor.submit(new Runnable() {
@@ -122,7 +141,8 @@ public class TestAsyncRunLoop {
     void run(TaskCallback callback);
   }
 
-  class TestTask implements AsyncStreamTask, WindowableTask {
+
+  class TestTask implements AsyncStreamTask, WindowableTask, 
EndOfStreamListenerTask {
     boolean shutdown = false;
     boolean commit = false;
     boolean success;
@@ -166,8 +186,14 @@ public class TestAsyncRunLoop {
         coordinator.shutdown(shutdownRequest);
       }
     }
+
+    @Override
+    public void onEndOfStream(MessageCollector collector, TaskCoordinator 
coordinator) {
+      coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    }
   }
 
+
   @Before
   public void setup() {
     executor = null;
@@ -180,7 +206,7 @@ public class TestAsyncRunLoop {
     offsetManager = mock(OffsetManager.class);
     shutdownRequest = TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER;
 
-    when(consumerMultiplexer.pollIntervalMs()).thenReturn(1000000);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
 
     tasks = new HashMap<>();
     task0 = new TestTask(true, true, false);
@@ -191,6 +217,7 @@ public class TestAsyncRunLoop {
     tasks.put(taskName1, t1);
   }
 
+
   @Test
   public void testProcessMultipleTasks() throws Exception {
     AsyncRunLoop runLoop = createRunLoop();
@@ -207,6 +234,7 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.processes().getCount());
   }
 
+
   @Test
   public void testProcessInOrder() throws Exception {
     AsyncRunLoop runLoop = createRunLoop();
@@ -223,12 +251,10 @@ public class TestAsyncRunLoop {
     assertEquals(3L, containerMetrics.processes().getCount());
   }
 
-  @Test
-  public void testProcessOutOfOrder() throws Exception {
-    maxMessagesInFlight = 2;
 
+  private TestCode buildOutofOrderCallback() {
     final CountDownLatch latch = new CountDownLatch(1);
-    task0.code = new TestCode() {
+    return new TestCode() {
       @Override
       public void run(TaskCallback callback) {
         IncomingMessageEnvelope envelope = ((TaskCallbackImpl) 
callback).envelope;
@@ -246,6 +272,13 @@ public class TestAsyncRunLoop {
         }
       }
     };
+  }
+
+  @Test
+  public void testProcessOutOfOrder() throws Exception {
+    maxMessagesInFlight = 2;
+
+    task0.code = buildOutofOrderCallback();
 
     AsyncRunLoop runLoop = createRunLoop();
     
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
@@ -330,4 +363,153 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.envelopes().getCount());
     assertEquals(2L, containerMetrics.processes().getCount());
   }
+
+  @Test
+  public void testEndOfStreamWithMultipleTasks() throws Exception {
+    task0 = new TestTask(true, true, false);
+    task1 = new TestTask(true, true, false);
+    t0 = createTaskInstance(task0, taskName0, ssp0);
+    t1 = createTaskInstance(task1, taskName1, ssp1);
+    tasks.put(taskName0, t0);
+    tasks.put(taskName1, t1);
+
+    AsyncRunLoop runLoop = createRunLoop();
+    
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream);
+    runLoop.run();
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    assertEquals(1, task0.processed);
+    assertEquals(1, task0.completed.get());
+    assertEquals(1, task1.processed);
+    assertEquals(1, task1.completed.get());
+
+    assertEquals(4L, containerMetrics.envelopes().getCount());
+    assertEquals(2L, containerMetrics.processes().getCount());
+  }
+
+  @Test
+  public void testEndOfStreamWithOutOfOrderProcess() throws Exception {
+    maxMessagesInFlight = 2;
+    task0 = new TestTask(true, true, false);
+    task1 = new TestTask(true, true, false);
+    t0 = createTaskInstance(task0, taskName0, ssp0);
+    t1 = createTaskInstance(task1, taskName1, ssp1);
+    tasks.put(taskName0, t0);
+    tasks.put(taskName1, t1);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    task0.code = buildOutofOrderCallback();
+    AsyncRunLoop runLoop = createRunLoop();
+    when(consumerMultiplexer.choose(false))
+        .thenReturn(envelope0)
+        .thenReturn(envelope3)
+        .thenReturn(envelope1)
+        .thenReturn(null)
+        .thenReturn(ssp0EndOfStream)
+        .thenReturn(ssp1EndOfStream);
+
+    runLoop.run();
+
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    assertEquals(2, task0.processed);
+    assertEquals(2, task0.completed.get());
+    assertEquals(1, task1.processed);
+    assertEquals(1, task1.completed.get());
+    assertEquals(5L, containerMetrics.envelopes().getCount());
+    assertEquals(3L, containerMetrics.processes().getCount());
+  }
+
+  @Test
+  public void testEndOfStreamCommitBehavior() throws Exception {
+    //explicitly configure to disable commits inside process or window calls 
and invoke commit from end of stream
+    task0 = new TestTask(true, false, false);
+    task1 = new TestTask(true, false, false);
+
+    t0 = createTaskInstance(task0, taskName0, ssp0);
+    t1 = createTaskInstance(task1, taskName1, ssp1);
+    tasks.put(taskName0, t0);
+    tasks.put(taskName1, t1);
+    AsyncRunLoop runLoop = createRunLoop();
+
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
+        .thenReturn(envelope1)
+        .thenReturn(null)
+        .thenReturn(ssp0EndOfStream)
+        .thenReturn(ssp1EndOfStream);
+    runLoop.run();
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    verify(offsetManager).checkpoint(taskName0);
+    verify(offsetManager).checkpoint(taskName1);
+  }
+
+  @Test
+  public void testEndOfStreamOffsetManagement() throws Exception {
+    //explicitly configure to disable commits inside process or window calls 
and invoke commit from end of stream
+    TestTask mockStreamTask1 = new TestTask(true, false, false);
+    TestTask mockStreamTask2 = new TestTask(true, false, false);
+
+    Config config = new MapConfig();
+
+    Partition p1 = new Partition(1);
+    Partition p2 = new Partition(2);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("system1", 
"stream1", p1);
+    SystemStreamPartition ssp2 = new SystemStreamPartition("system1", 
"stream2", p2);
+    IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", 
"key1", "message1");
+    IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", 
"key1", "message1");
+    IncomingMessageEnvelope envelope3 = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2);
+
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new 
HashMap<>();
+    List<IncomingMessageEnvelope> messageList = new ArrayList<>();
+    messageList.add(envelope1);
+    messageList.add(envelope2);
+    messageList.add(envelope3);
+    sspMap.put(ssp2, messageList);
+
+
+
+    SystemConsumer mockConsumer = mock(SystemConsumer.class);
+    when(mockConsumer.poll((Set<SystemStreamPartition>) anyObject(), 
anyLong())).thenReturn(sspMap);
+
+    HashMap<String, SystemConsumer> systemConsumerMap = new HashMap<>();
+    systemConsumerMap.put("system1", mockConsumer);
+    SystemConsumers consumers = 
TestSystemConsumers.getSystemConsumers(systemConsumerMap);
+
+    TaskName taskName1 = new TaskName("task1");
+    TaskName taskName2 = new TaskName("task2");
+    Set<TaskName> taskNames = new HashSet<>();
+    taskNames.add(taskName1);
+    taskNames.add(taskName2);
+
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
+    when(offsetManager.getLastProcessedOffset(taskName1, 
ssp1)).thenReturn(Option.apply("3"));
+    when(offsetManager.getLastProcessedOffset(taskName2, 
ssp2)).thenReturn(Option.apply("0"));
+    when(offsetManager.getStartingOffset(taskName1, 
ssp1)).thenReturn(Option.apply(IncomingMessageEnvelope.END_OF_STREAM_OFFSET));
+    when(offsetManager.getStartingOffset(taskName2, 
ssp2)).thenReturn(Option.apply("1"));
+
+    TaskInstance<AsyncStreamTask> taskInstance1 = 
createTaskInstance(mockStreamTask1, taskName1, ssp1, offsetManager, consumers);
+    TaskInstance<AsyncStreamTask> taskInstance2 = 
createTaskInstance(mockStreamTask2, taskName2, ssp2, offsetManager, consumers);
+    Map<TaskName, TaskInstance<AsyncStreamTask>> tasks = new HashMap<>();
+    tasks.put(taskName1, taskInstance1);
+    tasks.put(taskName2, taskInstance2);
+
+    taskInstance1.registerConsumers();
+    taskInstance2.registerConsumers();
+    consumers.start();
+
+    AsyncRunLoop runLoop =     new AsyncRunLoop(tasks,
+        executor,
+        consumers,
+        maxMessagesInFlight,
+        windowMs,
+        commitMs,
+        callbackTimeoutMs,
+        containerMetrics);
+
+
+    runLoop.run();
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+
+
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index db2249b..b5d58e3 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -327,3 +327,9 @@ class TestSystemConsumers {
     def register { super.register(systemStreamPartition, "0") }
   }
 }
+
+object TestSystemConsumers {
+  def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer]) : 
SystemConsumers = {
+    new SystemConsumers(new DefaultChooser, consumers.toMap)
+  }
+}

Reply via email to