http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index deb657a..aae12a0 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -18,11 +18,11 @@
 package org.apache.streams.local.queues;
 
 import org.apache.streams.local.builders.LocalStreamBuilder;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
 
-import javax.management.*;
 import java.lang.management.ManagementFactory;
 import java.util.Collection;
 import java.util.Iterator;
@@ -32,6 +32,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
 
 /**
  * A {@link java.util.concurrent.BlockingQueue} implementation that allows the 
measure measurement of how
@@ -44,437 +50,437 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class ThroughputQueue<E> implements BlockingQueue<E>, 
ThroughputQueueMXBean {
 
-    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s";
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ThroughputQueue.class);
-
-    private BlockingQueue<ThroughputElement<E>> underlyingQueue;
-    private AtomicLong elementsAdded;
-    private AtomicLong elementsRemoved;
-    private AtomicLong startTime;
-    private AtomicLong totalQueueTime;
-    private long maxQueuedTime;
-    private volatile boolean active;
-    private ReadWriteLock maxQueueTimeLock;
-
-    /**
-     * Creates an unbounded, unregistered {@code ThroughputQueue}
-     */
-    public ThroughputQueue() {
-        this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThroughputQueue.class);
+
+  private BlockingQueue<ThroughputElement<E>> underlyingQueue;
+  private AtomicLong elementsAdded;
+  private AtomicLong elementsRemoved;
+  private AtomicLong startTime;
+  private AtomicLong totalQueueTime;
+  private long maxQueuedTime;
+  private volatile boolean active;
+  private ReadWriteLock maxQueueTimeLock;
+
+  /**
+   * Creates an unbounded, unregistered {@code ThroughputQueue}
+   */
+  public ThroughputQueue() {
+    this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  }
+
+  /**
+   *
+   * @param streamIdentifier
+   * @param startedAt
+   */
+  public ThroughputQueue(String streamIdentifier, long startedAt) {
+    this(-1, null, streamIdentifier, startedAt);
+  }
+
+  /**
+   * Creates a bounded, unregistered {@code ThroughputQueue}
+   *
+   * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+   */
+  public ThroughputQueue(int maxSize) {
+    this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  }
+
+  /**
+   *
+   * @param maxSize
+   * @param streamIdentifier
+   * @param startedAt
+   */
+  public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) 
{
+    this(maxSize, null, streamIdentifier, startedAt);
+  }
+
+  /**
+   * Creates an unbounded, registered {@code ThroughputQueue}
+   *
+   * @param id unique id for this queue to be registered with. if id == NULL 
then not registered
+   */
+  public ThroughputQueue(String id) {
+    this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  }
+
+  /**
+   *
+   * @param id
+   * @param streamIdentifier
+   * @param startedAt
+   */
+  public ThroughputQueue(String id, String streamIdentifier, long startedAt) {
+    this(-1, id, streamIdentifier, startedAt);
+  }
+
+  /**
+   *
+   * @param maxSize
+   * @param id
+   */
+  public ThroughputQueue(int maxSize, String id) {
+    this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+
+  }
+
+  /**
+   * Creates a bounded, registered {@code ThroughputQueue}
+   *
+   * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+   * @param id      unique id for this queue to be registered with. if id == 
NULL then not registered
+   */
+  public ThroughputQueue(int maxSize, String id, String streamIdentifier, long 
startedAt) {
+    if (maxSize < 1) {
+      this.underlyingQueue = new LinkedBlockingQueue<>();
+    } else {
+      this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
+    }
+    this.elementsAdded = new AtomicLong(0);
+    this.elementsRemoved = new AtomicLong(0);
+    this.startTime = new AtomicLong(-1);
+    this.active = false;
+    this.maxQueuedTime = 0;
+    this.maxQueueTimeLock = new ReentrantReadWriteLock();
+    this.totalQueueTime = new AtomicLong(0);
+    if (id != null) {
+      try {
+        ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, 
streamIdentifier, startedAt));
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        mbs.registerMBean(this, name);
+      } catch (MalformedObjectNameException | InstanceAlreadyExistsException | 
MBeanRegistrationException | NotCompliantMBeanException e) {
+        LOGGER.error("Failed to register MXBean : {}", e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public boolean add(E e) {
+    if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
+      internalAddElement();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean offer(E e) {
+    if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
+      internalAddElement();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void put(E e) throws InterruptedException {
+    this.underlyingQueue.put(new ThroughputElement<E>(e));
+    internalAddElement();
+  }
+
+  @Override
+  public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
+    if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, 
unit)) {
+      internalAddElement();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public E take() throws InterruptedException {
+    ThroughputElement<E> e = this.underlyingQueue.take();
+    internalRemoveElement(e);
+    return e.getElement();
+  }
+
+  @Override
+  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+    ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+    if(e != null) {
+      internalRemoveElement(e);
+      return e.getElement();
+    }
+    return null;
+  }
+
+  @Override
+  public int remainingCapacity() {
+    return this.underlyingQueue.remainingCapacity();
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    try {
+      return this.underlyingQueue.remove(new ThroughputElement<E>((E) o));
+    } catch (ClassCastException cce) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    try {
+      return this.underlyingQueue.contains(new ThroughputElement<E>((E) o));
+    } catch (ClassCastException cce) {
+      return false;
+    }
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c, int maxElements) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public E remove() {
+    ThroughputElement<E> e = this.underlyingQueue.remove();
+    if(e != null) {
+      internalRemoveElement(e);
+      return e.getElement();
+    }
+    return null;
+  }
+
+  @Override
+  public E poll() {
+    ThroughputElement<E> e = this.underlyingQueue.poll();
+    if(e != null) {
+      internalRemoveElement(e);
+      return e.getElement();
+    }
+    return null;
+  }
+
+  @Override
+  public E element() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public E peek() {
+    ThroughputElement<E> e = this.underlyingQueue.peek();
+    if( e != null) {
+      return e.getElement();
+    }
+    return null;
+  }
+
+  @Override
+  public int size() {
+    return this.underlyingQueue.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return this.underlyingQueue.isEmpty();
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Object[] toArray() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public <T> T[] toArray(T[] a) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> c) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void clear() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public long getCurrentSize() {
+    return this.elementsAdded.get() - this.elementsRemoved.get();
+  }
+
+  /**
+   * If elements have been removed from the queue or no elements have been 
added, it returns the average wait time
+   * in milliseconds. If elements have been added, but none have been removed, 
it returns the time waited by the first
+   * element in the queue.
+   *
+   * @return the average wait time in milliseconds
+   */
+  @Override
+  public double getAvgWait() {
+    if (this.elementsRemoved.get() == 0) {
+      if (this.getCurrentSize() > 0) {
+        return this.underlyingQueue.peek().getWaited();
+      } else {
+        return 0.0;
+      }
+    } else {
+      return (double) this.totalQueueTime.get() / (double) 
this.elementsRemoved.get();
+    }
+  }
+
+  @Override
+  public long getMaxWait() {
+    ThroughputElement<E> e = this.underlyingQueue.peek();
+    long max = -1;
+    try {
+      this.maxQueueTimeLock.readLock().lock();
+      if (e != null && e.getWaited() > this.maxQueuedTime) {
+        max = e.getWaited();
+      } else {
+        max = this.maxQueuedTime;
+      }
+    } finally {
+      this.maxQueueTimeLock.readLock().unlock();
+    }
+    return max;
+  }
+
+  @Override
+  public long getRemoved() {
+    return this.elementsRemoved.get();
+  }
+
+  @Override
+  public long getAdded() {
+    return this.elementsAdded.get();
+  }
+
+  @Override
+  public double getThroughput() {
+    if (active) {
+      return this.elementsRemoved.get() / ((System.currentTimeMillis() - 
this.startTime.get()) / 1000.0);
+    }
+    return 0.0;
+  }
+
+  /**
+   * Handles updating the stats whenever elements are added to the queue
+   */
+  private void internalAddElement() {
+    this.elementsAdded.incrementAndGet();
+    synchronized (this) {
+      if (!this.active) {
+        this.startTime.set(System.currentTimeMillis());
+        this.active = true;
+      }
+    }
+  }
+
+  /**
+   * Handle updating the stats whenever elements are removed from the queue
+   * @param e Element removed
+   */
+  private void internalRemoveElement(ThroughputElement<E> e) {
+    if(e != null) {
+      this.elementsRemoved.incrementAndGet();
+      Long queueTime = e.getWaited();
+      this.totalQueueTime.addAndGet(queueTime);
+      boolean unlocked = false;
+      try {
+        this.maxQueueTimeLock.readLock().lock();
+        if (this.maxQueuedTime < queueTime) {
+          this.maxQueueTimeLock.readLock().unlock();
+          unlocked = true;
+          try {
+            this.maxQueueTimeLock.writeLock().lock();
+            this.maxQueuedTime = queueTime;
+          } finally {
+            this.maxQueueTimeLock.writeLock().unlock();
+          }
+        }
+      } finally {
+        if (!unlocked)
+          this.maxQueueTimeLock.readLock().unlock();
+      }
     }
+  }
 
-    /**
-     *
-     * @param streamIdentifier
-     * @param startedAt
-     */
-    public ThroughputQueue(String streamIdentifier, long startedAt) {
-        this(-1, null, streamIdentifier, startedAt);
-    }
 
-    /**
-     * Creates a bounded, unregistered {@code ThroughputQueue}
-     *
-     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
-     */
-    public ThroughputQueue(int maxSize) {
-        this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
-    }
+  /**
+   * Element wrapper to measure time waiting on the queue
+   *
+   * @param <E>
+   */
+  private class ThroughputElement<E> {
 
-    /**
-     *
-     * @param maxSize
-     * @param streamIdentifier
-     * @param startedAt
-     */
-    public ThroughputQueue(int maxSize, String streamIdentifier, long 
startedAt) {
-        this(maxSize, null, streamIdentifier, startedAt);
-    }
-
-    /**
-     * Creates an unbounded, registered {@code ThroughputQueue}
-     *
-     * @param id unique id for this queue to be registered with. if id == NULL 
then not registered
-     */
-    public ThroughputQueue(String id) {
-        this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
-    }
+    private long queuedTime;
+    private E element;
 
-    /**
-     *
-     * @param id
-     * @param streamIdentifier
-     * @param startedAt
-     */
-    public ThroughputQueue(String id, String streamIdentifier, long startedAt) 
{
-        this(-1, id, streamIdentifier, startedAt);
+    protected ThroughputElement(E element) {
+      this.element = element;
+      this.queuedTime = System.currentTimeMillis();
     }
 
     /**
+     * Get the time this element has been waiting on the queue.
+     * current time - time element was queued
      *
-     * @param maxSize
-     * @param id
+     * @return time this element has been waiting on the queue in milliseconds
      */
-    public ThroughputQueue(int maxSize, String id) {
-        this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
-
+    public long getWaited() {
+      return System.currentTimeMillis() - this.queuedTime;
     }
 
     /**
-     * Creates a bounded, registered {@code ThroughputQueue}
+     * Get the queued element
      *
-     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
-     * @param id      unique id for this queue to be registered with. if id == 
NULL then not registered
+     * @return the element
      */
-    public ThroughputQueue(int maxSize, String id, String streamIdentifier, 
long startedAt) {
-        if (maxSize < 1) {
-            this.underlyingQueue = new LinkedBlockingQueue<>();
-        } else {
-            this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
-        }
-        this.elementsAdded = new AtomicLong(0);
-        this.elementsRemoved = new AtomicLong(0);
-        this.startTime = new AtomicLong(-1);
-        this.active = false;
-        this.maxQueuedTime = 0;
-        this.maxQueueTimeLock = new ReentrantReadWriteLock();
-        this.totalQueueTime = new AtomicLong(0);
-        if (id != null) {
-            try {
-                ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, 
id, streamIdentifier, startedAt));
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                mbs.registerMBean(this, name);
-            } catch (MalformedObjectNameException | 
InstanceAlreadyExistsException | MBeanRegistrationException | 
NotCompliantMBeanException e) {
-                LOGGER.error("Failed to register MXBean : {}", e);
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    @Override
-    public boolean add(E e) {
-        if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
-            internalAddElement();
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean offer(E e) {
-        if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
-            internalAddElement();
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public void put(E e) throws InterruptedException {
-        this.underlyingQueue.put(new ThroughputElement<E>(e));
-        internalAddElement();
-    }
-
-    @Override
-    public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
-        if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, 
unit)) {
-            internalAddElement();
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public E take() throws InterruptedException {
-        ThroughputElement<E> e = this.underlyingQueue.take();
-        internalRemoveElement(e);
-        return e.getElement();
-    }
-
-    @Override
-    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-        ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
-        if(e != null) {
-            internalRemoveElement(e);
-            return e.getElement();
-        }
-        return null;
-    }
-
-    @Override
-    public int remainingCapacity() {
-        return this.underlyingQueue.remainingCapacity();
-    }
-
-    @Override
-    public boolean remove(Object o) {
-        try {
-            return this.underlyingQueue.remove(new ThroughputElement<E>((E) 
o));
-        } catch (ClassCastException cce) {
-            return false;
-        }
-    }
-
-    @Override
-    public boolean contains(Object o) {
-        try {
-            return this.underlyingQueue.contains(new ThroughputElement<E>((E) 
o));
-        } catch (ClassCastException cce) {
-            return false;
-        }
-    }
-
-    @Override
-    public int drainTo(Collection<? super E> c) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public int drainTo(Collection<? super E> c, int maxElements) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public E remove() {
-        ThroughputElement<E> e = this.underlyingQueue.remove();
-        if(e != null) {
-            internalRemoveElement(e);
-            return e.getElement();
-        }
-        return null;
-    }
-
-    @Override
-    public E poll() {
-        ThroughputElement<E> e = this.underlyingQueue.poll();
-        if(e != null) {
-            internalRemoveElement(e);
-            return e.getElement();
-        }
-        return null;
+    public E getElement() {
+      return this.element;
     }
 
-    @Override
-    public E element() {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public E peek() {
-        ThroughputElement<E> e = this.underlyingQueue.peek();
-        if( e != null) {
-            return e.getElement();
-        }
-        return null;
-    }
-
-    @Override
-    public int size() {
-        return this.underlyingQueue.size();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return this.underlyingQueue.isEmpty();
-    }
-
-    @Override
-    public Iterator<E> iterator() {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Object[] toArray() {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public <T> T[] toArray(T[] a) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean containsAll(Collection<?> c) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean addAll(Collection<? extends E> c) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean removeAll(Collection<?> c) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean retainAll(Collection<?> c) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public void clear() {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public long getCurrentSize() {
-        return this.elementsAdded.get() - this.elementsRemoved.get();
-    }
 
     /**
-     * If elements have been removed from the queue or no elements have been 
added, it returns the average wait time
-     * in milliseconds. If elements have been added, but none have been 
removed, it returns the time waited by the first
-     * element in the queue.
-     *
-     * @return the average wait time in milliseconds
+     * Measures equality by the element and ignores the queued time
+     * @param obj
+     * @return
      */
     @Override
-    public double getAvgWait() {
-        if (this.elementsRemoved.get() == 0) {
-            if (this.getCurrentSize() > 0) {
-                return this.underlyingQueue.peek().getWaited();
-            } else {
-                return 0.0;
-            }
+    public boolean equals(Object obj) {
+      if(obj instanceof ThroughputElement && obj != null) {
+        ThroughputElement that = (ThroughputElement) obj;
+        if(that.getElement() == null && this.getElement() == null) {
+          return true;
+        } else if(that.getElement() != null) {
+          return that.getElement().equals(this.getElement());
         } else {
-            return (double) this.totalQueueTime.get() / (double) 
this.elementsRemoved.get();
-        }
-    }
-
-    @Override
-    public long getMaxWait() {
-        ThroughputElement<E> e = this.underlyingQueue.peek();
-        long max = -1;
-        try {
-            this.maxQueueTimeLock.readLock().lock();
-            if (e != null && e.getWaited() > this.maxQueuedTime) {
-                max = e.getWaited();
-            } else {
-                max = this.maxQueuedTime;
-            }
-        } finally {
-            this.maxQueueTimeLock.readLock().unlock();
-        }
-        return max;
-    }
-
-    @Override
-    public long getRemoved() {
-        return this.elementsRemoved.get();
-    }
-
-    @Override
-    public long getAdded() {
-        return this.elementsAdded.get();
-    }
-
-    @Override
-    public double getThroughput() {
-        if (active) {
-            return this.elementsRemoved.get() / ((System.currentTimeMillis() - 
this.startTime.get()) / 1000.0);
-        }
-        return 0.0;
-    }
-
-    /**
-     * Handles updating the stats whenever elements are added to the queue
-     */
-    private void internalAddElement() {
-        this.elementsAdded.incrementAndGet();
-        synchronized (this) {
-            if (!this.active) {
-                this.startTime.set(System.currentTimeMillis());
-                this.active = true;
-            }
-        }
-    }
-
-    /**
-     * Handle updating the stats whenever elements are removed from the queue
-     * @param e Element removed
-     */
-    private void internalRemoveElement(ThroughputElement<E> e) {
-        if(e != null) {
-            this.elementsRemoved.incrementAndGet();
-            Long queueTime = e.getWaited();
-            this.totalQueueTime.addAndGet(queueTime);
-            boolean unlocked = false;
-            try {
-                this.maxQueueTimeLock.readLock().lock();
-                if (this.maxQueuedTime < queueTime) {
-                    this.maxQueueTimeLock.readLock().unlock();
-                    unlocked = true;
-                    try {
-                        this.maxQueueTimeLock.writeLock().lock();
-                        this.maxQueuedTime = queueTime;
-                    } finally {
-                        this.maxQueueTimeLock.writeLock().unlock();
-                    }
-                }
-            } finally {
-                if (!unlocked)
-                    this.maxQueueTimeLock.readLock().unlock();
-            }
-        }
-    }
-
-
-    /**
-     * Element wrapper to measure time waiting on the queue
-     *
-     * @param <E>
-     */
-    private class ThroughputElement<E> {
-
-        private long queuedTime;
-        private E element;
-
-        protected ThroughputElement(E element) {
-            this.element = element;
-            this.queuedTime = System.currentTimeMillis();
-        }
-
-        /**
-         * Get the time this element has been waiting on the queue.
-         * current time - time element was queued
-         *
-         * @return time this element has been waiting on the queue in 
milliseconds
-         */
-        public long getWaited() {
-            return System.currentTimeMillis() - this.queuedTime;
-        }
-
-        /**
-         * Get the queued element
-         *
-         * @return the element
-         */
-        public E getElement() {
-            return this.element;
-        }
-
-
-        /**
-         * Measures equality by the element and ignores the queued time
-         * @param obj
-         * @return
-         */
-        @Override
-        public boolean equals(Object obj) {
-            if(obj instanceof ThroughputElement && obj != null) {
-                ThroughputElement that = (ThroughputElement) obj;
-                if(that.getElement() == null && this.getElement() == null) {
-                    return true;
-                } else if(that.getElement() != null) {
-                    return that.getElement().equals(this.getElement());
-                } else {
-                    return false;
-                }
-            }
-            return false;
+          return false;
         }
+      }
+      return false;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
index 571a035..9cc4593 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -17,50 +17,48 @@
  */
 package org.apache.streams.local.queues;
 
-import javax.management.MXBean;
-
 /**
  * MXBean capable queue that monitors the throughput of the queue
  */
 public interface ThroughputQueueMXBean {
 
-    /**
-     * Returns the number of items on the queue.
-     * @return number of items on queue
-     */ 
-    public long getCurrentSize();
+  /**
+   * Returns the number of items on the queue.
+   * @return number of items on queue
+   */
+  public long getCurrentSize();
 
-    /**
-     * Get the average time an item spends in queue in milliseconds
-     * @return average time an item spends in queue in milliseconds
-     */
-    public double getAvgWait();
+  /**
+   * Get the average time an item spends in queue in milliseconds
+   * @return average time an item spends in queue in milliseconds
+   */
+  public double getAvgWait();
 
-    /**
-     * Get the maximum time an item has spent on the queue before being 
removed from the queue.
-     * @return the maximum time an item has spent on the queue
-     */
-    public long getMaxWait();
+  /**
+   * Get the maximum time an item has spent on the queue before being removed 
from the queue.
+   * @return the maximum time an item has spent on the queue
+   */
+  public long getMaxWait();
 
-    /**
-     * Get the number of items that have been removed from this queue
-     * @return number of items that have been removed from the queue
-     */
-    public long getRemoved();
+  /**
+   * Get the number of items that have been removed from this queue
+   * @return number of items that have been removed from the queue
+   */
+  public long getRemoved();
 
-    /**
-     * Get the number of items that have been added to the queue
-     * @return number of items that have been added to the queue
-     */
-    public long getAdded();
+  /**
+   * Get the number of items that have been added to the queue
+   * @return number of items that have been added to the queue
+   */
+  public long getAdded();
 
-    /**
-     * Get the the throughput of the queue measured by the number of items 
removed from the queue
-     * dived by the time the queue has been active.
-     * Active time starts once the first item has been placed on the queue
-     * @return throughput of queue. items/sec, items removed / time active
-     */
-    public double getThroughput();
+  /**
+   * Get the the throughput of the queue measured by the number of items 
removed from the queue
+   * dived by the time the queue has been active.
+   * Active time starts once the first item has been placed on the queue
+   * @return throughput of queue. items/sec, items removed / time active
+   */
+  public double getThroughput();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 8bc7769..5f4bfdb 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -18,200 +18,202 @@
 
 package org.apache.streams.local.tasks;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * BaseStreamsTask is the primary abstract StreamsTask.
  */
 public abstract class BaseStreamsTask implements StreamsTask {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseStreamsTask.class);
-
-    private List<BlockingQueue<StreamsDatum>> inQueues = new 
ArrayList<BlockingQueue<StreamsDatum>>();
-    private List<BlockingQueue<StreamsDatum>> outQueues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
-    private int inIndex = 0;
-    private ObjectMapper mapper;
-    protected StreamsConfiguration streamConfig;
-
-    public BaseStreamsTask(StreamsConfiguration config) {
-        this.mapper = StreamsJacksonMapper.getInstance();
-        this.mapper.registerSubtypes(Activity.class);
-        if( config != null )
-            this.streamConfig = config;
-        else
-            this.streamConfig = StreamsConfigurator.detectConfiguration();
-
-        setStartedAt();
-    }
-
-
-    @Override
-    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
-        this.inQueues.add(inputQueue);
-    }
-
-    @Override
-    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
-        this.outQueues.add(outputQueue);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseStreamsTask.class);
+
+  private List<BlockingQueue<StreamsDatum>> inQueues = new 
ArrayList<BlockingQueue<StreamsDatum>>();
+  private List<BlockingQueue<StreamsDatum>> outQueues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
+  private int inIndex = 0;
+  private ObjectMapper mapper;
+  protected StreamsConfiguration streamConfig;
+
+  public BaseStreamsTask(StreamsConfiguration config) {
+    this.mapper = StreamsJacksonMapper.getInstance();
+    this.mapper.registerSubtypes(Activity.class);
+    if( config != null ) {
+      this.streamConfig = config;
+    } else {
+      this.streamConfig = StreamsConfigurator.detectConfiguration();
     }
-
-    @Override
-    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
-        return this.inQueues;
+    setStartedAt();
+  }
+
+  @Override
+  public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
+    this.inQueues.add(inputQueue);
+  }
+
+  @Override
+  public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
+    this.outQueues.add(outputQueue);
+  }
+
+  @Override
+  public List<BlockingQueue<StreamsDatum>> getInputQueues() {
+    return this.inQueues;
+  }
+
+  @Override
+  public List<BlockingQueue<StreamsDatum>> getOutputQueues() {
+    return this.outQueues;
+  }
+
+  /**
+   * SHOULD NOT BE NECCESARY, WILL REMOVE.
+   * Round Robins through input queues to get the next StreamsDatum. If all 
input queues are empty, it will return null.
+   * @return the next StreamsDatum or null if all input queues are empty.
+   */
+  @Deprecated
+  protected StreamsDatum getNextDatum() {
+    int startIndex = this.inIndex;
+    int index = startIndex;
+    StreamsDatum datum = null;
+    do {
+      datum = this.inQueues.get(index).poll();
+      index = getNextInputQueueIndex();
+    } while( datum == null && startIndex != index);
+    return datum;
+  }
+
+  /**
+   * Adds a StreamDatum to the outgoing queues.  If there are multiple queues, 
it uses serialization to create
+   * clones of the datum and adds a new clone to each queue.
+   * @param datum
+   */
+  protected void addToOutgoingQueue(StreamsDatum datum) throws 
InterruptedException{
+    if(this.outQueues.size() == 1) {
+      outQueues.get(0).put(datum);
     }
-
-    @Override
-    public List<BlockingQueue<StreamsDatum>> getOutputQueues() {
-        return this.outQueues;
-    }
-
-    /**
-     * SHOULD NOT BE NECCESARY, WILL REMOVE.
-     * Round Robins through input queues to get the next StreamsDatum. If all 
input queues are empty, it will return null.
-     * @return the next StreamsDatum or null if all input queues are empty.
-     */
-    @Deprecated
-    protected StreamsDatum getNextDatum() {
-        int startIndex = this.inIndex;
-        int index = startIndex;
-        StreamsDatum datum = null;
-        do {
-            datum = this.inQueues.get(index).poll();
-            index = getNextInputQueueIndex();
-        } while( datum == null && startIndex != index);
-        return datum;
-    }
-
-    /**
-     * Adds a StreamDatum to the outgoing queues.  If there are multiple 
queues, it uses serialization to create
-     * clones of the datum and adds a new clone to each queue.
-     * @param datum
-     */
-    protected void addToOutgoingQueue(StreamsDatum datum) throws 
InterruptedException{
-        if(this.outQueues.size() == 1) {
-            outQueues.get(0).put(datum);
-        }
-        else {
-            List<BlockingQueue<StreamsDatum>> toOutput = 
Lists.newLinkedList(this.outQueues);
-            while(!toOutput.isEmpty()) {
-                for (BlockingQueue<StreamsDatum> queue : toOutput) {
-                    StreamsDatum newDatum = cloneStreamsDatum(datum);
-                    if (newDatum != null) {
-                        if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) 
{
-                            toOutput.remove(queue);
-                        }
-                    }
-                }
+    else {
+      List<BlockingQueue<StreamsDatum>> toOutput = 
Lists.newLinkedList(this.outQueues);
+      while(!toOutput.isEmpty()) {
+        for (BlockingQueue<StreamsDatum> queue : toOutput) {
+          StreamsDatum newDatum = cloneStreamsDatum(datum);
+          if (newDatum != null) {
+            if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) {
+              toOutput.remove(queue);
             }
+          }
         }
+      }
     }
+  }
 
-    @Override
-    public boolean isWaiting() {
-        if(this.inQueues == null || this.inQueues.size() == 0) {
-            return true;
-        }
-        boolean empty = true;
-        for(Queue queue : this.inQueues) {
-            empty = empty && queue.isEmpty();
-        }
-        return empty;
+  @Override
+  public boolean isWaiting() {
+    if(this.inQueues == null || this.inQueues.size() == 0) {
+      return true;
     }
-
-    /**
-     * //TODO LOCAL MODE HACK. Need to fix
-     * In order for our data streams to ported to other data flow frame 
works(Storm, Hadoop, Spark, etc) we need to be able to
-     * enforce the serialization required by each framework.  This needs some 
thought and design before a final solution is
-     * made.
-     *
-     * In order to be able to copy/clone StreamDatums the orginal idea was to 
force all StreamsDatums to be java serializable.
-     * This was seen as unacceptable for local mode.  So until we come up with 
a solution to enforce serialization and be
-     * compatiable across multiple frame works, this hack is in place.
-     *
-     * If datum.document is Serializable, we use serialization to clone a new 
copy.  If it is not Serializable we attempt
-     * different methods using an com.fasterxml.jackson.databind.ObjectMapper 
to copy/clone the StreamsDatum. If the object
-     * is not clonable by these methods, an error is reported to the logging 
and a NULL object is returned.
-     *
-     * @param datum
-     * @return
-     */
-    protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
-        try {
-
-            if(datum.document instanceof ObjectNode) {
-                return copyMetaData(datum, new StreamsDatum(((ObjectNode) 
datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
-            }
-            else if(datum.document instanceof Activity) {
-
-                return copyMetaData(datum, new 
StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document),
 Activity.class),
-                                        datum.timestamp,
-                                        datum.sequenceid));
-            }
+    boolean empty = true;
+    for(Queue queue : this.inQueues) {
+      empty = empty && queue.isEmpty();
+    }
+    return empty;
+  }
+
+  /**
+   * //TODO LOCAL MODE HACK. Need to fix
+   * In order for our data streams to ported to other data flow frame 
works(Storm, Hadoop, Spark, etc) we need to be able to
+   * enforce the serialization required by each framework.  This needs some 
thought and design before a final solution is
+   * made.
+   *
+   * In order to be able to copy/clone StreamDatums the orginal idea was to 
force all StreamsDatums to be java serializable.
+   * This was seen as unacceptable for local mode.  So until we come up with a 
solution to enforce serialization and be
+   * compatiable across multiple frame works, this hack is in place.
+   *
+   * If datum.document is Serializable, we use serialization to clone a new 
copy.  If it is not Serializable we attempt
+   * different methods using an com.fasterxml.jackson.databind.ObjectMapper to 
copy/clone the StreamsDatum. If the object
+   * is not clonable by these methods, an error is reported to the logging and 
a NULL object is returned.
+   *
+   * @param datum
+   * @return
+   */
+  protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
+    try {
+
+      if(datum.document instanceof ObjectNode) {
+        return copyMetaData(datum, new StreamsDatum(((ObjectNode) 
datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
+      }
+      else if(datum.document instanceof Activity) {
+
+        return copyMetaData(datum, new 
StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document),
 Activity.class),
+            datum.timestamp,
+            datum.sequenceid));
+      }
 //            else if(this.mapper.canSerialize(datum.document.getClass())){
 //                return new 
StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document),
 datum.document.getClass()),
 //                                        datum.timestamp,
 //                                        datum.sequenceid);
 //            }
 
-            else if(datum.document instanceof Serializable) {
-                return (StreamsDatum) 
SerializationUtil.cloneBySerialization(datum);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to clone/copy StreamsDatum : 
{}", e);
-        }
-        LOGGER.error("Failed to clone/copy StreamsDatum with document of class 
: {}", datum.document.getClass().getName());
-        return null;
+      else if(datum.document instanceof Serializable) {
+        return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", 
e);
     }
-
-    private int getNextInputQueueIndex() {
-        ++this.inIndex;
-        if(this.inIndex >= this.inQueues.size()) {
-            this.inIndex = 0;
-        }
-        return this.inIndex;
+    LOGGER.error("Failed to clone/copy StreamsDatum with document of class : 
{}", datum.document.getClass().getName());
+    return null;
+  }
+
+  private int getNextInputQueueIndex() {
+    ++this.inIndex;
+    if(this.inIndex >= this.inQueues.size()) {
+      this.inIndex = 0;
     }
-
-    private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum 
copyTo) {
-        Map<String, Object> fromMeta = copyFrom.getMetadata();
-        Map<String, Object> toMeta = copyTo.getMetadata();
-        for(String key : fromMeta.keySet()) {
-            Object value = fromMeta.get(key);
-            if(value instanceof Serializable)
-                toMeta.put(key, SerializationUtil.cloneBySerialization(value));
-            else //hope for the best - should be serializable
-                toMeta.put(key, value);
-        }
-        return copyTo;
+    return this.inIndex;
+  }
+
+  private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum 
copyTo) {
+    Map<String, Object> fromMeta = copyFrom.getMetadata();
+    Map<String, Object> toMeta = copyTo.getMetadata();
+    for(String key : fromMeta.keySet()) {
+      Object value = fromMeta.get(key);
+      if(value instanceof Serializable)
+        toMeta.put(key, SerializationUtil.cloneBySerialization(value));
+      else //hope for the best - should be serializable
+        toMeta.put(key, value);
     }
+    return copyTo;
+  }
 
-    public long getStartedAt() {
-        return streamConfig.getStartedAt();
-    }
+  public long getStartedAt() {
+    return streamConfig.getStartedAt();
+  }
 
-    public void setStartedAt() {
-        streamConfig.setStartedAt(DateTime.now().getMillis());
-    }
+  public void setStartedAt() {
+    streamConfig.setStartedAt(DateTime.now().getMillis());
+  }
 
-    public String getStreamIdentifier() {
-        return streamConfig.getIdentifier();
-    }
+  public String getStreamIdentifier() {
+    return streamConfig.getIdentifier();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
index e93ee1d..6df9767 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
@@ -28,67 +28,67 @@ import java.util.concurrent.Executor;
 @Deprecated
 public class LocalStreamProcessMonitorThread implements 
StatusCounterMonitorRunnable
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class);
 
-    private Executor executor;
+  private Executor executor;
 
-    private int seconds;
+  private int seconds;
 
-    private boolean run = true;
+  private boolean run = true;
 
-    public LocalStreamProcessMonitorThread(Executor executor, int 
delayInSeconds) {
-        this.executor = executor;
-        this.seconds = delayInSeconds;
-    }
+  public LocalStreamProcessMonitorThread(Executor executor, int 
delayInSeconds) {
+    this.executor = executor;
+    this.seconds = delayInSeconds;
+  }
 
-    @Override
-    public void shutdown(){
-        this.run = false;
-    }
+  @Override
+  public void shutdown(){
+    this.run = false;
+  }
 
-    @Override
-    public boolean isRunning() {
-        return this.run;
-    }
+  @Override
+  public boolean isRunning() {
+    return this.run;
+  }
 
-    @Override
-    public void run()
-    {
-        while(run){
-
-            /**
-             *
-             * Note:
-             * Quick class and method to let us see what is going on with the 
JVM. We need to make sure
-             * that everything is running with as little memory as possible. 
If we are generating a heap
-             * overflow, this will be very apparent by the information shown 
here.
-             */
-
-            MemoryUsage memoryUsage = 
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-
-            String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? 
"NO_LIMIT" :
-                    humanReadableByteCount(memoryUsage.getMax(), true);
-
-            String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), 
true);
-
-            LOGGER.debug("[monitor] Used Memory: {}, Max: {}",
-                    usedMemory,
-                    maxMemory);
-
-            try
-            {
-                Thread.sleep(seconds*1000);
-            }
-            catch (InterruptedException e)
-            { }
-        }
-    }
+  @Override
+  public void run()
+  {
+    while(run){
+
+      /**
+       *
+       * Note:
+       * Quick class and method to let us see what is going on with the JVM. 
We need to make sure
+       * that everything is running with as little memory as possible. If we 
are generating a heap
+       * overflow, this will be very apparent by the information shown here.
+       */
+
+      MemoryUsage memoryUsage = 
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+
+      String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? "NO_LIMIT" :
+          humanReadableByteCount(memoryUsage.getMax(), true);
+
+      String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true);
+
+      LOGGER.debug("[monitor] Used Memory: {}, Max: {}",
+          usedMemory,
+          maxMemory);
 
-    public String humanReadableByteCount(long bytes, boolean si) {
-        int unit = si ? 1000 : 1024;
-        if (bytes < unit) return bytes + " B";
-        int exp = (int) (Math.log(bytes) / Math.log(unit));
-        String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : 
"i");
-        return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+      try
+      {
+        Thread.sleep(seconds*1000);
+      }
+      catch (InterruptedException e)
+      { }
     }
+  }
+
+  public String humanReadableByteCount(long bytes, boolean si) {
+    int unit = si ? 1000 : 1024;
+    if (bytes < unit) return bytes + " B";
+    int exp = (int) (Math.log(bytes) / Math.log(unit));
+    String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i");
+    return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
index 5d4d8b5..9dc91d3 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
@@ -20,6 +20,6 @@ package org.apache.streams.local.tasks;
 
 @Deprecated
 public interface StatusCounterMonitorRunnable extends Runnable {
-    void shutdown();
-    boolean isRunning();
+  void shutdown();
+  boolean isRunning();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
index c5413db..d59f0d6 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
@@ -19,60 +19,61 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.DatumStatusCountable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Deprecated
 public class StatusCounterMonitorThread implements 
StatusCounterMonitorRunnable {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(StatusCounterMonitorThread.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StatusCounterMonitorThread.class);
 
-    private DatumStatusCountable task;
+  private DatumStatusCountable task;
 
-    private int seconds;
+  private int seconds;
 
-    private boolean run = true;
+  private boolean run = true;
 
-    public StatusCounterMonitorThread(DatumStatusCountable task, int 
delayInSeconds) {
-        this.task = task;
-        this.seconds = delayInSeconds;
-    }
+  public StatusCounterMonitorThread(DatumStatusCountable task, int 
delayInSeconds) {
+    this.task = task;
+    this.seconds = delayInSeconds;
+  }
 
-    @Override
-    public void shutdown() {
-        this.run = false;
-    }
+  @Override
+  public void shutdown() {
+    this.run = false;
+  }
 
-    @Override
-    public boolean isRunning() {
-        return this.run;
-    }
+  @Override
+  public boolean isRunning() {
+    return this.run;
+  }
 
-    @Override
-    public void run() {
-        while(run) {
+  @Override
+  public void run() {
+    while(run) {
 
-            /**
-             *
-             * Note:
-             * Quick class and method to let us see what is going on with the 
JVM. We need to make sure
-             * that everything is running with as little memory as possible. 
If we are generating a heap
-             * overflow, this will be very apparent by the information shown 
here.
-             */
+      /**
+       *
+       * Note:
+       * Quick class and method to let us see what is going on with the JVM. 
We need to make sure
+       * that everything is running with as little memory as possible. If we 
are generating a heap
+       * overflow, this will be very apparent by the information shown here.
+       */
 
-            LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, 
{} total",
-                    task.getClass(),
-                    task.getDatumStatusCounter().getAttempted(),
-                    task.getDatumStatusCounter().getSuccess(),
-                    task.getDatumStatusCounter().getPartial(),
-                    task.getDatumStatusCounter().getFail(),
-                    task.getDatumStatusCounter().getEmitted());
+      LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} 
total",
+          task.getClass(),
+          task.getDatumStatusCounter().getAttempted(),
+          task.getDatumStatusCounter().getSuccess(),
+          task.getDatumStatusCounter().getPartial(),
+          task.getDatumStatusCounter().getFail(),
+          task.getDatumStatusCounter().getEmitted());
 
-            try {
-                Thread.sleep(seconds*1000);
-            }
-            catch (InterruptedException e){
-                shutdown();
-            }
-        }
+      try {
+        Thread.sleep(seconds*1000);
+      }
+      catch (InterruptedException e){
+        shutdown();
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 69cd5a5..473d2f4 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -21,9 +21,9 @@ package org.apache.streams.local.tasks;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.local.counters.StreamsTaskCounter;
+
 import org.apache.commons.lang.NotImplementedException;
 
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -34,57 +34,57 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Deprecated
 public class StreamsMergeTask extends BaseStreamsTask {
 
-    private AtomicBoolean keepRunning;
-    private long sleepTime;
+  private AtomicBoolean keepRunning;
+  private long sleepTime;
 
-    public StreamsMergeTask() {
-        this(null);
-    }
+  public StreamsMergeTask() {
+    this(null);
+  }
 
-    public StreamsMergeTask(StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        this.sleepTime = sleepTime;
-        this.keepRunning = new AtomicBoolean(true);
-    }
+  public StreamsMergeTask(StreamsConfiguration streamConfig) {
+    super(streamConfig);
+    this.sleepTime = sleepTime;
+    this.keepRunning = new AtomicBoolean(true);
+  }
 
-    @Override
-    public void stopTask() {
-        this.keepRunning.set(false);
-    }
+  @Override
+  public void stopTask() {
+    this.keepRunning.set(false);
+  }
 
-    @Override
-    public void setStreamConfig(StreamsConfiguration config) {
+  @Override
+  public void setStreamConfig(StreamsConfiguration config) {
 
-    }
+  }
 
-    @Override
-    public boolean isRunning() {
-        return false;
-    }
+  @Override
+  public boolean isRunning() {
+    return false;
+  }
 
-    @Override
-    public void run() {
-        while(this.keepRunning.get()) {
-            StreamsDatum datum = super.getNextDatum();
-            if(datum != null) {
-                try {
-                    super.addToOutgoingQueue(datum);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-            else {
-                try {
-                    Thread.sleep(this.sleepTime);
-                } catch (InterruptedException e) {
-                    this.keepRunning.set(false);
-                }
-            }
+  @Override
+  public void run() {
+    while(this.keepRunning.get()) {
+      StreamsDatum datum = super.getNextDatum();
+      if(datum != null) {
+        try {
+          super.addToOutgoingQueue(datum);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
+      }
+      else {
+        try {
+          Thread.sleep(this.sleepTime);
+        } catch (InterruptedException e) {
+          this.keepRunning.set(false);
+        }
+      }
     }
+  }
 
-    @Override
-    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
-        throw new NotImplementedException();
-    }
+  @Override
+  public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+    throw new NotImplementedException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index fb97218..5c918b2 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -18,15 +18,22 @@
 
 package org.apache.streams.local.tasks;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.util.DatumUtils;
 import org.apache.streams.local.counters.StreamsTaskCounter;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,136 +43,136 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class StreamsPersistWriterTask extends BaseStreamsTask implements 
DatumStatusCountable {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsPersistWriterTask.class);
-
-    private StreamsPersistWriter writer;
-    private AtomicBoolean keepRunning;
-    private StreamsConfiguration streamConfig;
-    private BlockingQueue<StreamsDatum> inQueue;
-    private AtomicBoolean isRunning;
-    private AtomicBoolean blocked;
-    private StreamsTaskCounter counter;
-
-    private DatumStatusCounter statusCounter = new DatumStatusCounter();
-
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        return this.statusCounter;
-    }
-
-
-    /**
-     * Default constructor.  Uses default sleep of 500ms when inbound queue is 
empty.
-     * @param writer writer to execute in task
-     */
-    public StreamsPersistWriterTask(StreamsPersistWriter writer) {
-        this(writer, null);
-    }
-
-    /**
-     *
-     * @param writer writer to execute in task
-     * @param streamConfig stream config
-     */
-    public StreamsPersistWriterTask(StreamsPersistWriter writer, 
StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        this.streamConfig = super.streamConfig;
-        this.writer = writer;
-        this.keepRunning = new AtomicBoolean(true);
-        this.isRunning = new AtomicBoolean(true);
-        this.blocked = new AtomicBoolean(false);
-    }
-
-    @Override
-    public boolean isWaiting() {
-        return this.inQueue.isEmpty() && this.blocked.get();
-    }
-
-    @Override
-    public void setStreamConfig(StreamsConfiguration config) {
-        this.streamConfig = config;
-    }
-
-    @Override
-    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
-        this.inQueue = inputQueue;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return this.isRunning.get();
-    }
-
-    @Override
-    public void run() {
+  private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsPersistWriterTask.class);
+
+  private StreamsPersistWriter writer;
+  private AtomicBoolean keepRunning;
+  private StreamsConfiguration streamConfig;
+  private BlockingQueue<StreamsDatum> inQueue;
+  private AtomicBoolean isRunning;
+  private AtomicBoolean blocked;
+  private StreamsTaskCounter counter;
+
+  private DatumStatusCounter statusCounter = new DatumStatusCounter();
+
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    return this.statusCounter;
+  }
+
+
+  /**
+   * Default constructor.  Uses default sleep of 500ms when inbound queue is 
empty.
+   * @param writer writer to execute in task
+   */
+  public StreamsPersistWriterTask(StreamsPersistWriter writer) {
+    this(writer, null);
+  }
+
+  /**
+   *
+   * @param writer writer to execute in task
+   * @param streamConfig stream config
+   */
+  public StreamsPersistWriterTask(StreamsPersistWriter writer, 
StreamsConfiguration streamConfig) {
+    super(streamConfig);
+    this.streamConfig = super.streamConfig;
+    this.writer = writer;
+    this.keepRunning = new AtomicBoolean(true);
+    this.isRunning = new AtomicBoolean(true);
+    this.blocked = new AtomicBoolean(false);
+  }
+
+  @Override
+  public boolean isWaiting() {
+    return this.inQueue.isEmpty() && this.blocked.get();
+  }
+
+  @Override
+  public void setStreamConfig(StreamsConfiguration config) {
+    this.streamConfig = config;
+  }
+
+  @Override
+  public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
+    this.inQueue = inputQueue;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return this.isRunning.get();
+  }
+
+  @Override
+  public void run() {
+    try {
+      this.writer.prepare(this.streamConfig);
+      if(this.counter == null) {
+        this.counter = new 
StreamsTaskCounter(this.writer.getClass().getName()+ 
UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+      }
+      while(this.keepRunning.get()) {
+        StreamsDatum datum = null;
         try {
-            this.writer.prepare(this.streamConfig);
-            if(this.counter == null) {
-                this.counter = new 
StreamsTaskCounter(this.writer.getClass().getName()+ 
UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
-            }
-            while(this.keepRunning.get()) {
-                StreamsDatum datum = null;
-                try {
-                    this.blocked.set(true);
-                    datum = this.inQueue.poll(5, TimeUnit.SECONDS);
-                } catch (InterruptedException ie) {
-                    LOGGER.debug("Received InterruptedException. Shutting down 
and re-applying interrupt status.");
-                    this.keepRunning.set(false);
-                    if(!this.inQueue.isEmpty()) {
-                        LOGGER.error("Received InteruptedException and input 
queue still has data, count={}, processor={}",this.inQueue.size(), 
this.writer.getClass().getName());
-                    }
-                    Thread.currentThread().interrupt();
-                } finally {
-                    this.blocked.set(false);
-                }
-                if(datum != null) {
-                    this.counter.incrementReceivedCount();
-                    try {
-                        long startTime = System.currentTimeMillis();
-                        this.writer.write(datum);
-                        this.counter.addTime(System.currentTimeMillis() - 
startTime);
-                        statusCounter.incrementStatus(DatumStatus.SUCCESS);
-                    } catch (Exception e) {
-                        LOGGER.error("Error writing to persist writer {}", 
this.writer.getClass().getSimpleName(), e);
-                        this.keepRunning.set(false); // why do we shutdown on 
a failed write ?
-                        statusCounter.incrementStatus(DatumStatus.FAIL);
-                        DatumUtils.addErrorToMetadata(datum, e, 
this.writer.getClass());
-                        this.counter.incrementErrorCount();
-                    }
-                } else { //datums should never be null
-                    LOGGER.trace("Received null StreamsDatum @ writer : {}", 
this.writer.getClass().getName());
-                }
-            }
-            
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), 
TimeUnit.MILLISECONDS);
-        } catch(Throwable e) {
-            LOGGER.error("Caught Throwable in Persist Writer {} : {}", 
this.writer.getClass().getSimpleName(), e);
+          this.blocked.set(true);
+          datum = this.inQueue.poll(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+          LOGGER.debug("Received InterruptedException. Shutting down and 
re-applying interrupt status.");
+          this.keepRunning.set(false);
+          if(!this.inQueue.isEmpty()) {
+            LOGGER.error("Received InteruptedException and input queue still 
has data, count={}, processor={}",this.inQueue.size(), 
this.writer.getClass().getName());
+          }
+          Thread.currentThread().interrupt();
         } finally {
-            
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), 
TimeUnit.MILLISECONDS);
-            this.writer.cleanUp();
-            this.isRunning.set(false);
+          this.blocked.set(false);
         }
+        if(datum != null) {
+          this.counter.incrementReceivedCount();
+          try {
+            long startTime = System.currentTimeMillis();
+            this.writer.write(datum);
+            this.counter.addTime(System.currentTimeMillis() - startTime);
+            statusCounter.incrementStatus(DatumStatus.SUCCESS);
+          } catch (Exception e) {
+            LOGGER.error("Error writing to persist writer {}", 
this.writer.getClass().getSimpleName(), e);
+            this.keepRunning.set(false); // why do we shutdown on a failed 
write ?
+            statusCounter.incrementStatus(DatumStatus.FAIL);
+            DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
+            this.counter.incrementErrorCount();
+          }
+        } else { //datums should never be null
+          LOGGER.trace("Received null StreamsDatum @ writer : {}", 
this.writer.getClass().getName());
+        }
+      }
+      
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), 
TimeUnit.MILLISECONDS);
+    } catch(Throwable e) {
+      LOGGER.error("Caught Throwable in Persist Writer {} : {}", 
this.writer.getClass().getSimpleName(), e);
+    } finally {
+      
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), 
TimeUnit.MILLISECONDS);
+      this.writer.cleanUp();
+      this.isRunning.set(false);
     }
-
-    @Override
-    public void stopTask() {
-        this.keepRunning.set(false);
-    }
-
-
-    @Override
-    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
-        throw new UnsupportedOperationException(this.getClass().getName()+" 
does not support method - setOutputQueue()");
-    }
-
-    @Override
-    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
-        List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>();
-        queues.add(this.inQueue);
-        return queues;
-    }
-
-    @Override
-    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
-        this.counter = counter;
-    }
+  }
+
+  @Override
+  public void stopTask() {
+    this.keepRunning.set(false);
+  }
+
+
+  @Override
+  public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
+    throw new UnsupportedOperationException(this.getClass().getName()+" does 
not support method - setOutputQueue()");
+  }
+
+  @Override
+  public List<BlockingQueue<StreamsDatum>> getInputQueues() {
+    List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>();
+    queues.add(this.inQueue);
+    return queues;
+  }
+
+  @Override
+  public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+    this.counter = counter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 137c7e1..8720c68 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -18,15 +18,21 @@
 
 package org.apache.streams.local.tasks;
 
-import com.google.common.collect.Maps;
 import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.core.util.DatumUtils;
 import org.apache.streams.local.counters.StreamsTaskCounter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,139 +42,139 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class StreamsProcessorTask extends BaseStreamsTask implements 
DatumStatusCountable {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsProcessorTask.class);
-
-
-    private StreamsProcessor processor;
-    private AtomicBoolean keepRunning;
-    private StreamsConfiguration streamConfig;
-    private BlockingQueue<StreamsDatum> inQueue;
-    private AtomicBoolean isRunning;
-    private AtomicBoolean blocked;
-    private StreamsTaskCounter counter;
-
-    private DatumStatusCounter statusCounter = new DatumStatusCounter();
-
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        return this.statusCounter;
-    }
-
-    /**
-     * Default constructor, uses default sleep time of 500ms when inbound 
queue is empty
-     * @param processor process to run in task
-     */
-    public StreamsProcessorTask(StreamsProcessor processor) {
-        this(processor, new StreamsConfiguration());
-    }
-
-    /**
-     * @param processor
-     * @param streamConfig
-     */
-    public StreamsProcessorTask(StreamsProcessor processor, 
StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        this.streamConfig = super.streamConfig;
-        this.processor = processor;
-        this.keepRunning = new AtomicBoolean(true);
-        this.isRunning = new AtomicBoolean(true);
-        this.blocked = new AtomicBoolean(true);
-    }
-
-    @Override
-    public boolean isWaiting() {
-        return this.inQueue.isEmpty() && this.blocked.get();
-    }
-
-    @Override
-    public void stopTask() {
-        this.keepRunning.set(false);
-    }
-
-    @Override
-    public void setStreamConfig(StreamsConfiguration config) {
-        this.streamConfig = config;
-    }
-
-    @Override
-    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
-        this.inQueue = inputQueue;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return this.isRunning.get();
-    }
-
-    @Override
-    public void run() {
+  private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsProcessorTask.class);
+
+
+  private StreamsProcessor processor;
+  private AtomicBoolean keepRunning;
+  private StreamsConfiguration streamConfig;
+  private BlockingQueue<StreamsDatum> inQueue;
+  private AtomicBoolean isRunning;
+  private AtomicBoolean blocked;
+  private StreamsTaskCounter counter;
+
+  private DatumStatusCounter statusCounter = new DatumStatusCounter();
+
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    return this.statusCounter;
+  }
+
+  /**
+   * Default constructor, uses default sleep time of 500ms when inbound queue 
is empty
+   * @param processor process to run in task
+   */
+  public StreamsProcessorTask(StreamsProcessor processor) {
+    this(processor, new StreamsConfiguration());
+  }
+
+  /**
+   * @param processor
+   * @param streamConfig
+   */
+  public StreamsProcessorTask(StreamsProcessor processor, StreamsConfiguration 
streamConfig) {
+    super(streamConfig);
+    this.streamConfig = super.streamConfig;
+    this.processor = processor;
+    this.keepRunning = new AtomicBoolean(true);
+    this.isRunning = new AtomicBoolean(true);
+    this.blocked = new AtomicBoolean(true);
+  }
+
+  @Override
+  public boolean isWaiting() {
+    return this.inQueue.isEmpty() && this.blocked.get();
+  }
+
+  @Override
+  public void stopTask() {
+    this.keepRunning.set(false);
+  }
+
+  @Override
+  public void setStreamConfig(StreamsConfiguration config) {
+    this.streamConfig = config;
+  }
+
+  @Override
+  public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
+    this.inQueue = inputQueue;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return this.isRunning.get();
+  }
+
+  @Override
+  public void run() {
+    try {
+      this.processor.prepare(this.streamConfig);
+      if(this.counter == null) {
+        this.counter = new 
StreamsTaskCounter(this.processor.getClass().getName()+ 
UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+      }
+      while(this.keepRunning.get()) {
+        StreamsDatum datum = null;
         try {
-            this.processor.prepare(this.streamConfig);
-            if(this.counter == null) {
-                this.counter = new 
StreamsTaskCounter(this.processor.getClass().getName()+ 
UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
-            }
-            while(this.keepRunning.get()) {
-                StreamsDatum datum = null;
-                try {
-                    this.blocked.set(true);
-                    datum = 
this.inQueue.poll(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-                } catch (InterruptedException ie) {
-                    LOGGER.debug("Received InteruptedException, shutting down 
and re-applying interrupt status.");
-                    this.keepRunning.set(false);
-                    if(!this.inQueue.isEmpty()) {
-                        LOGGER.error("Received InteruptedException and input 
queue still has data, count={}, processor={}",this.inQueue.size(), 
this.processor.getClass().getName());
-                    }
-                    Thread.currentThread().interrupt();
-                } finally {
-                    this.blocked.set(false);
-                }
-                if(datum != null) {
-                    this.counter.incrementReceivedCount();
-                    try {
-                        long startTime = System.currentTimeMillis();
-                        List<StreamsDatum> output = 
this.processor.process(datum);
-                        this.counter.addTime(System.currentTimeMillis() - 
startTime);
-                        if(output != null) {
-                            for(StreamsDatum outDatum : output) {
-                                super.addToOutgoingQueue(outDatum);
-                                this.counter.incrementEmittedCount();
-                                
statusCounter.incrementStatus(DatumStatus.SUCCESS);
-                            }
-                        }
-                    } catch (InterruptedException ie) {
-                        LOGGER.warn("Received InterruptedException, shutting 
down and re-applying interrupt status.");
-                        this.keepRunning.set(false);
-                        Thread.currentThread().interrupt();
-                    } catch (Throwable t) {
-                        this.counter.incrementErrorCount();
-                        LOGGER.warn("Caught Throwable in processor, {} : {}", 
this.processor.getClass().getName(), t);
-                        statusCounter.incrementStatus(DatumStatus.FAIL);
-                        //Add the error to the metadata, but keep processing
-                        DatumUtils.addErrorToMetadata(datum, t, 
this.processor.getClass());
-                    }
-                } else {
-                    LOGGER.trace("Removed NULL datum from queue at processor : 
{}", this.processor.getClass().getName());
-                }
-            }
-        } catch(Throwable e) {
-            LOGGER.error("Caught Throwable in Processor {}", 
this.processor.getClass().getSimpleName(), e);
+          this.blocked.set(true);
+          datum = this.inQueue.poll(streamConfig.getBatchFrequencyMs(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+          LOGGER.debug("Received InteruptedException, shutting down and 
re-applying interrupt status.");
+          this.keepRunning.set(false);
+          if(!this.inQueue.isEmpty()) {
+            LOGGER.error("Received InteruptedException and input queue still 
has data, count={}, processor={}",this.inQueue.size(), 
this.processor.getClass().getName());
+          }
+          Thread.currentThread().interrupt();
         } finally {
-            this.isRunning.set(false);
-            this.processor.cleanUp();
+          this.blocked.set(false);
         }
+        if(datum != null) {
+          this.counter.incrementReceivedCount();
+          try {
+            long startTime = System.currentTimeMillis();
+            List<StreamsDatum> output = this.processor.process(datum);
+            this.counter.addTime(System.currentTimeMillis() - startTime);
+            if(output != null) {
+              for(StreamsDatum outDatum : output) {
+                super.addToOutgoingQueue(outDatum);
+                this.counter.incrementEmittedCount();
+                statusCounter.incrementStatus(DatumStatus.SUCCESS);
+              }
+            }
+          } catch (InterruptedException ie) {
+            LOGGER.warn("Received InterruptedException, shutting down and 
re-applying interrupt status.");
+            this.keepRunning.set(false);
+            Thread.currentThread().interrupt();
+          } catch (Throwable t) {
+            this.counter.incrementErrorCount();
+            LOGGER.warn("Caught Throwable in processor, {} : {}", 
this.processor.getClass().getName(), t);
+            statusCounter.incrementStatus(DatumStatus.FAIL);
+            //Add the error to the metadata, but keep processing
+            DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass());
+          }
+        } else {
+          LOGGER.trace("Removed NULL datum from queue at processor : {}", 
this.processor.getClass().getName());
+        }
+      }
+    } catch(Throwable e) {
+      LOGGER.error("Caught Throwable in Processor {}", 
this.processor.getClass().getSimpleName(), e);
+    } finally {
+      this.isRunning.set(false);
+      this.processor.cleanUp();
     }
-
-    @Override
-    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
-        List<BlockingQueue<StreamsDatum>> queues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
-        queues.add(this.inQueue);
-        return queues;
-    }
-
-    @Override
-    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
-        this.counter = counter;
-    }
+  }
+
+  @Override
+  public List<BlockingQueue<StreamsDatum>> getInputQueues() {
+    List<BlockingQueue<StreamsDatum>> queues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
+    queues.add(this.inQueue);
+    return queues;
+  }
+
+  @Override
+  public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+    this.counter = counter;
+  }
 
 
 }

Reply via email to