http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java index deb657a..aae12a0 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -18,11 +18,11 @@ package org.apache.streams.local.queues; import org.apache.streams.local.builders.LocalStreamBuilder; + +import org.apache.commons.lang.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; -import javax.management.*; import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.Iterator; @@ -32,6 +32,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; /** * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how @@ -44,437 +50,437 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean { - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s"; - - private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); - - private BlockingQueue<ThroughputElement<E>> underlyingQueue; - private AtomicLong elementsAdded; - private AtomicLong elementsRemoved; - private AtomicLong startTime; - private AtomicLong totalQueueTime; - private long maxQueuedTime; - private volatile boolean active; - private ReadWriteLock maxQueueTimeLock; - - /** - * Creates an unbounded, unregistered {@code ThroughputQueue} - */ - public ThroughputQueue() { - this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); + + private BlockingQueue<ThroughputElement<E>> underlyingQueue; + private AtomicLong elementsAdded; + private AtomicLong elementsRemoved; + private AtomicLong startTime; + private AtomicLong totalQueueTime; + private long maxQueuedTime; + private volatile boolean active; + private ReadWriteLock maxQueueTimeLock; + + /** + * Creates an unbounded, unregistered {@code ThroughputQueue} + */ + public ThroughputQueue() { + this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(String streamIdentifier, long startedAt) { + this(-1, null, streamIdentifier, startedAt); + } + + /** + * Creates a bounded, unregistered {@code ThroughputQueue} + * + * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded + */ + public ThroughputQueue(int maxSize) { + this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param maxSize + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) { + this(maxSize, null, streamIdentifier, startedAt); + } + + /** + * Creates an unbounded, registered {@code ThroughputQueue} + * + * @param id unique id for this queue to be registered with. if id == NULL then not registered + */ + public ThroughputQueue(String id) { + this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param id + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(String id, String streamIdentifier, long startedAt) { + this(-1, id, streamIdentifier, startedAt); + } + + /** + * + * @param maxSize + * @param id + */ + public ThroughputQueue(int maxSize, String id) { + this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + + } + + /** + * Creates a bounded, registered {@code ThroughputQueue} + * + * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded + * @param id unique id for this queue to be registered with. if id == NULL then not registered + */ + public ThroughputQueue(int maxSize, String id, String streamIdentifier, long startedAt) { + if (maxSize < 1) { + this.underlyingQueue = new LinkedBlockingQueue<>(); + } else { + this.underlyingQueue = new LinkedBlockingQueue<>(maxSize); + } + this.elementsAdded = new AtomicLong(0); + this.elementsRemoved = new AtomicLong(0); + this.startTime = new AtomicLong(-1); + this.active = false; + this.maxQueuedTime = 0; + this.maxQueueTimeLock = new ReentrantReadWriteLock(); + this.totalQueueTime = new AtomicLong(0); + if (id != null) { + try { + ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt)); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(this, name); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + } + + @Override + public boolean add(E e) { + if (this.underlyingQueue.add(new ThroughputElement<E>(e))) { + internalAddElement(); + return true; + } + return false; + } + + @Override + public boolean offer(E e) { + if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) { + internalAddElement(); + return true; + } + return false; + } + + @Override + public void put(E e) throws InterruptedException { + this.underlyingQueue.put(new ThroughputElement<E>(e)); + internalAddElement(); + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { + internalAddElement(); + return true; + } + return false; + } + + @Override + public E take() throws InterruptedException { + ThroughputElement<E> e = this.underlyingQueue.take(); + internalRemoveElement(e); + return e.getElement(); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit); + if(e != null) { + internalRemoveElement(e); + return e.getElement(); + } + return null; + } + + @Override + public int remainingCapacity() { + return this.underlyingQueue.remainingCapacity(); + } + + @Override + public boolean remove(Object o) { + try { + return this.underlyingQueue.remove(new ThroughputElement<E>((E) o)); + } catch (ClassCastException cce) { + return false; + } + } + + @Override + public boolean contains(Object o) { + try { + return this.underlyingQueue.contains(new ThroughputElement<E>((E) o)); + } catch (ClassCastException cce) { + return false; + } + } + + @Override + public int drainTo(Collection<? super E> c) { + throw new NotImplementedException(); + } + + @Override + public int drainTo(Collection<? super E> c, int maxElements) { + throw new NotImplementedException(); + } + + @Override + public E remove() { + ThroughputElement<E> e = this.underlyingQueue.remove(); + if(e != null) { + internalRemoveElement(e); + return e.getElement(); + } + return null; + } + + @Override + public E poll() { + ThroughputElement<E> e = this.underlyingQueue.poll(); + if(e != null) { + internalRemoveElement(e); + return e.getElement(); + } + return null; + } + + @Override + public E element() { + throw new NotImplementedException(); + } + + @Override + public E peek() { + ThroughputElement<E> e = this.underlyingQueue.peek(); + if( e != null) { + return e.getElement(); + } + return null; + } + + @Override + public int size() { + return this.underlyingQueue.size(); + } + + @Override + public boolean isEmpty() { + return this.underlyingQueue.isEmpty(); + } + + @Override + public Iterator<E> iterator() { + throw new NotImplementedException(); + } + + @Override + public Object[] toArray() { + throw new NotImplementedException(); + } + + @Override + public <T> T[] toArray(T[] a) { + throw new NotImplementedException(); + } + + @Override + public boolean containsAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public boolean addAll(Collection<? extends E> c) { + throw new NotImplementedException(); + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public void clear() { + throw new NotImplementedException(); + } + + @Override + public long getCurrentSize() { + return this.elementsAdded.get() - this.elementsRemoved.get(); + } + + /** + * If elements have been removed from the queue or no elements have been added, it returns the average wait time + * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first + * element in the queue. + * + * @return the average wait time in milliseconds + */ + @Override + public double getAvgWait() { + if (this.elementsRemoved.get() == 0) { + if (this.getCurrentSize() > 0) { + return this.underlyingQueue.peek().getWaited(); + } else { + return 0.0; + } + } else { + return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get(); + } + } + + @Override + public long getMaxWait() { + ThroughputElement<E> e = this.underlyingQueue.peek(); + long max = -1; + try { + this.maxQueueTimeLock.readLock().lock(); + if (e != null && e.getWaited() > this.maxQueuedTime) { + max = e.getWaited(); + } else { + max = this.maxQueuedTime; + } + } finally { + this.maxQueueTimeLock.readLock().unlock(); + } + return max; + } + + @Override + public long getRemoved() { + return this.elementsRemoved.get(); + } + + @Override + public long getAdded() { + return this.elementsAdded.get(); + } + + @Override + public double getThroughput() { + if (active) { + return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0); + } + return 0.0; + } + + /** + * Handles updating the stats whenever elements are added to the queue + */ + private void internalAddElement() { + this.elementsAdded.incrementAndGet(); + synchronized (this) { + if (!this.active) { + this.startTime.set(System.currentTimeMillis()); + this.active = true; + } + } + } + + /** + * Handle updating the stats whenever elements are removed from the queue + * @param e Element removed + */ + private void internalRemoveElement(ThroughputElement<E> e) { + if(e != null) { + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; + try { + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } + } + } finally { + if (!unlocked) + this.maxQueueTimeLock.readLock().unlock(); + } } + } - /** - * - * @param streamIdentifier - * @param startedAt - */ - public ThroughputQueue(String streamIdentifier, long startedAt) { - this(-1, null, streamIdentifier, startedAt); - } - /** - * Creates a bounded, unregistered {@code ThroughputQueue} - * - * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded - */ - public ThroughputQueue(int maxSize) { - this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); - } + /** + * Element wrapper to measure time waiting on the queue + * + * @param <E> + */ + private class ThroughputElement<E> { - /** - * - * @param maxSize - * @param streamIdentifier - * @param startedAt - */ - public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) { - this(maxSize, null, streamIdentifier, startedAt); - } - - /** - * Creates an unbounded, registered {@code ThroughputQueue} - * - * @param id unique id for this queue to be registered with. if id == NULL then not registered - */ - public ThroughputQueue(String id) { - this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); - } + private long queuedTime; + private E element; - /** - * - * @param id - * @param streamIdentifier - * @param startedAt - */ - public ThroughputQueue(String id, String streamIdentifier, long startedAt) { - this(-1, id, streamIdentifier, startedAt); + protected ThroughputElement(E element) { + this.element = element; + this.queuedTime = System.currentTimeMillis(); } /** + * Get the time this element has been waiting on the queue. + * current time - time element was queued * - * @param maxSize - * @param id + * @return time this element has been waiting on the queue in milliseconds */ - public ThroughputQueue(int maxSize, String id) { - this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); - + public long getWaited() { + return System.currentTimeMillis() - this.queuedTime; } /** - * Creates a bounded, registered {@code ThroughputQueue} + * Get the queued element * - * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded - * @param id unique id for this queue to be registered with. if id == NULL then not registered + * @return the element */ - public ThroughputQueue(int maxSize, String id, String streamIdentifier, long startedAt) { - if (maxSize < 1) { - this.underlyingQueue = new LinkedBlockingQueue<>(); - } else { - this.underlyingQueue = new LinkedBlockingQueue<>(maxSize); - } - this.elementsAdded = new AtomicLong(0); - this.elementsRemoved = new AtomicLong(0); - this.startTime = new AtomicLong(-1); - this.active = false; - this.maxQueuedTime = 0; - this.maxQueueTimeLock = new ReentrantReadWriteLock(); - this.totalQueueTime = new AtomicLong(0); - if (id != null) { - try { - ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt)); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, name); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - LOGGER.error("Failed to register MXBean : {}", e); - throw new RuntimeException(e); - } - } - } - - @Override - public boolean add(E e) { - if (this.underlyingQueue.add(new ThroughputElement<E>(e))) { - internalAddElement(); - return true; - } - return false; - } - - @Override - public boolean offer(E e) { - if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) { - internalAddElement(); - return true; - } - return false; - } - - @Override - public void put(E e) throws InterruptedException { - this.underlyingQueue.put(new ThroughputElement<E>(e)); - internalAddElement(); - } - - @Override - public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { - internalAddElement(); - return true; - } - return false; - } - - @Override - public E take() throws InterruptedException { - ThroughputElement<E> e = this.underlyingQueue.take(); - internalRemoveElement(e); - return e.getElement(); - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit); - if(e != null) { - internalRemoveElement(e); - return e.getElement(); - } - return null; - } - - @Override - public int remainingCapacity() { - return this.underlyingQueue.remainingCapacity(); - } - - @Override - public boolean remove(Object o) { - try { - return this.underlyingQueue.remove(new ThroughputElement<E>((E) o)); - } catch (ClassCastException cce) { - return false; - } - } - - @Override - public boolean contains(Object o) { - try { - return this.underlyingQueue.contains(new ThroughputElement<E>((E) o)); - } catch (ClassCastException cce) { - return false; - } - } - - @Override - public int drainTo(Collection<? super E> c) { - throw new NotImplementedException(); - } - - @Override - public int drainTo(Collection<? super E> c, int maxElements) { - throw new NotImplementedException(); - } - - @Override - public E remove() { - ThroughputElement<E> e = this.underlyingQueue.remove(); - if(e != null) { - internalRemoveElement(e); - return e.getElement(); - } - return null; - } - - @Override - public E poll() { - ThroughputElement<E> e = this.underlyingQueue.poll(); - if(e != null) { - internalRemoveElement(e); - return e.getElement(); - } - return null; + public E getElement() { + return this.element; } - @Override - public E element() { - throw new NotImplementedException(); - } - - @Override - public E peek() { - ThroughputElement<E> e = this.underlyingQueue.peek(); - if( e != null) { - return e.getElement(); - } - return null; - } - - @Override - public int size() { - return this.underlyingQueue.size(); - } - - @Override - public boolean isEmpty() { - return this.underlyingQueue.isEmpty(); - } - - @Override - public Iterator<E> iterator() { - throw new NotImplementedException(); - } - - @Override - public Object[] toArray() { - throw new NotImplementedException(); - } - - @Override - public <T> T[] toArray(T[] a) { - throw new NotImplementedException(); - } - - @Override - public boolean containsAll(Collection<?> c) { - throw new NotImplementedException(); - } - - @Override - public boolean addAll(Collection<? extends E> c) { - throw new NotImplementedException(); - } - - @Override - public boolean removeAll(Collection<?> c) { - throw new NotImplementedException(); - } - - @Override - public boolean retainAll(Collection<?> c) { - throw new NotImplementedException(); - } - - @Override - public void clear() { - throw new NotImplementedException(); - } - - @Override - public long getCurrentSize() { - return this.elementsAdded.get() - this.elementsRemoved.get(); - } /** - * If elements have been removed from the queue or no elements have been added, it returns the average wait time - * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first - * element in the queue. - * - * @return the average wait time in milliseconds + * Measures equality by the element and ignores the queued time + * @param obj + * @return */ @Override - public double getAvgWait() { - if (this.elementsRemoved.get() == 0) { - if (this.getCurrentSize() > 0) { - return this.underlyingQueue.peek().getWaited(); - } else { - return 0.0; - } + public boolean equals(Object obj) { + if(obj instanceof ThroughputElement && obj != null) { + ThroughputElement that = (ThroughputElement) obj; + if(that.getElement() == null && this.getElement() == null) { + return true; + } else if(that.getElement() != null) { + return that.getElement().equals(this.getElement()); } else { - return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get(); - } - } - - @Override - public long getMaxWait() { - ThroughputElement<E> e = this.underlyingQueue.peek(); - long max = -1; - try { - this.maxQueueTimeLock.readLock().lock(); - if (e != null && e.getWaited() > this.maxQueuedTime) { - max = e.getWaited(); - } else { - max = this.maxQueuedTime; - } - } finally { - this.maxQueueTimeLock.readLock().unlock(); - } - return max; - } - - @Override - public long getRemoved() { - return this.elementsRemoved.get(); - } - - @Override - public long getAdded() { - return this.elementsAdded.get(); - } - - @Override - public double getThroughput() { - if (active) { - return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0); - } - return 0.0; - } - - /** - * Handles updating the stats whenever elements are added to the queue - */ - private void internalAddElement() { - this.elementsAdded.incrementAndGet(); - synchronized (this) { - if (!this.active) { - this.startTime.set(System.currentTimeMillis()); - this.active = true; - } - } - } - - /** - * Handle updating the stats whenever elements are removed from the queue - * @param e Element removed - */ - private void internalRemoveElement(ThroughputElement<E> e) { - if(e != null) { - this.elementsRemoved.incrementAndGet(); - Long queueTime = e.getWaited(); - this.totalQueueTime.addAndGet(queueTime); - boolean unlocked = false; - try { - this.maxQueueTimeLock.readLock().lock(); - if (this.maxQueuedTime < queueTime) { - this.maxQueueTimeLock.readLock().unlock(); - unlocked = true; - try { - this.maxQueueTimeLock.writeLock().lock(); - this.maxQueuedTime = queueTime; - } finally { - this.maxQueueTimeLock.writeLock().unlock(); - } - } - } finally { - if (!unlocked) - this.maxQueueTimeLock.readLock().unlock(); - } - } - } - - - /** - * Element wrapper to measure time waiting on the queue - * - * @param <E> - */ - private class ThroughputElement<E> { - - private long queuedTime; - private E element; - - protected ThroughputElement(E element) { - this.element = element; - this.queuedTime = System.currentTimeMillis(); - } - - /** - * Get the time this element has been waiting on the queue. - * current time - time element was queued - * - * @return time this element has been waiting on the queue in milliseconds - */ - public long getWaited() { - return System.currentTimeMillis() - this.queuedTime; - } - - /** - * Get the queued element - * - * @return the element - */ - public E getElement() { - return this.element; - } - - - /** - * Measures equality by the element and ignores the queued time - * @param obj - * @return - */ - @Override - public boolean equals(Object obj) { - if(obj instanceof ThroughputElement && obj != null) { - ThroughputElement that = (ThroughputElement) obj; - if(that.getElement() == null && this.getElement() == null) { - return true; - } else if(that.getElement() != null) { - return that.getElement().equals(this.getElement()); - } else { - return false; - } - } - return false; + return false; } + } + return false; } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java index 571a035..9cc4593 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java @@ -17,50 +17,48 @@ */ package org.apache.streams.local.queues; -import javax.management.MXBean; - /** * MXBean capable queue that monitors the throughput of the queue */ public interface ThroughputQueueMXBean { - /** - * Returns the number of items on the queue. - * @return number of items on queue - */ - public long getCurrentSize(); + /** + * Returns the number of items on the queue. + * @return number of items on queue + */ + public long getCurrentSize(); - /** - * Get the average time an item spends in queue in milliseconds - * @return average time an item spends in queue in milliseconds - */ - public double getAvgWait(); + /** + * Get the average time an item spends in queue in milliseconds + * @return average time an item spends in queue in milliseconds + */ + public double getAvgWait(); - /** - * Get the maximum time an item has spent on the queue before being removed from the queue. - * @return the maximum time an item has spent on the queue - */ - public long getMaxWait(); + /** + * Get the maximum time an item has spent on the queue before being removed from the queue. + * @return the maximum time an item has spent on the queue + */ + public long getMaxWait(); - /** - * Get the number of items that have been removed from this queue - * @return number of items that have been removed from the queue - */ - public long getRemoved(); + /** + * Get the number of items that have been removed from this queue + * @return number of items that have been removed from the queue + */ + public long getRemoved(); - /** - * Get the number of items that have been added to the queue - * @return number of items that have been added to the queue - */ - public long getAdded(); + /** + * Get the number of items that have been added to the queue + * @return number of items that have been added to the queue + */ + public long getAdded(); - /** - * Get the the throughput of the queue measured by the number of items removed from the queue - * dived by the time the queue has been active. - * Active time starts once the first item has been placed on the queue - * @return throughput of queue. items/sec, items removed / time active - */ - public double getThroughput(); + /** + * Get the the throughput of the queue measured by the number of items removed from the queue + * dived by the time the queue has been active. + * Active time starts once the first item has been placed on the queue + * @return throughput of queue. items/sec, items removed / time active + */ + public double getThroughput(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 8bc7769..5f4bfdb 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -18,200 +18,202 @@ package org.apache.streams.local.tasks; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** - * + * BaseStreamsTask is the primary abstract StreamsTask. */ public abstract class BaseStreamsTask implements StreamsTask { - private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class); - - private List<BlockingQueue<StreamsDatum>> inQueues = new ArrayList<BlockingQueue<StreamsDatum>>(); - private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>(); - private int inIndex = 0; - private ObjectMapper mapper; - protected StreamsConfiguration streamConfig; - - public BaseStreamsTask(StreamsConfiguration config) { - this.mapper = StreamsJacksonMapper.getInstance(); - this.mapper.registerSubtypes(Activity.class); - if( config != null ) - this.streamConfig = config; - else - this.streamConfig = StreamsConfigurator.detectConfiguration(); - - setStartedAt(); - } - - - @Override - public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { - this.inQueues.add(inputQueue); - } - - @Override - public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { - this.outQueues.add(outputQueue); + private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class); + + private List<BlockingQueue<StreamsDatum>> inQueues = new ArrayList<BlockingQueue<StreamsDatum>>(); + private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>(); + private int inIndex = 0; + private ObjectMapper mapper; + protected StreamsConfiguration streamConfig; + + public BaseStreamsTask(StreamsConfiguration config) { + this.mapper = StreamsJacksonMapper.getInstance(); + this.mapper.registerSubtypes(Activity.class); + if( config != null ) { + this.streamConfig = config; + } else { + this.streamConfig = StreamsConfigurator.detectConfiguration(); } - - @Override - public List<BlockingQueue<StreamsDatum>> getInputQueues() { - return this.inQueues; + setStartedAt(); + } + + @Override + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { + this.inQueues.add(inputQueue); + } + + @Override + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { + this.outQueues.add(outputQueue); + } + + @Override + public List<BlockingQueue<StreamsDatum>> getInputQueues() { + return this.inQueues; + } + + @Override + public List<BlockingQueue<StreamsDatum>> getOutputQueues() { + return this.outQueues; + } + + /** + * SHOULD NOT BE NECCESARY, WILL REMOVE. + * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null. + * @return the next StreamsDatum or null if all input queues are empty. + */ + @Deprecated + protected StreamsDatum getNextDatum() { + int startIndex = this.inIndex; + int index = startIndex; + StreamsDatum datum = null; + do { + datum = this.inQueues.get(index).poll(); + index = getNextInputQueueIndex(); + } while( datum == null && startIndex != index); + return datum; + } + + /** + * Adds a StreamDatum to the outgoing queues. If there are multiple queues, it uses serialization to create + * clones of the datum and adds a new clone to each queue. + * @param datum + */ + protected void addToOutgoingQueue(StreamsDatum datum) throws InterruptedException{ + if(this.outQueues.size() == 1) { + outQueues.get(0).put(datum); } - - @Override - public List<BlockingQueue<StreamsDatum>> getOutputQueues() { - return this.outQueues; - } - - /** - * SHOULD NOT BE NECCESARY, WILL REMOVE. - * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null. - * @return the next StreamsDatum or null if all input queues are empty. - */ - @Deprecated - protected StreamsDatum getNextDatum() { - int startIndex = this.inIndex; - int index = startIndex; - StreamsDatum datum = null; - do { - datum = this.inQueues.get(index).poll(); - index = getNextInputQueueIndex(); - } while( datum == null && startIndex != index); - return datum; - } - - /** - * Adds a StreamDatum to the outgoing queues. If there are multiple queues, it uses serialization to create - * clones of the datum and adds a new clone to each queue. - * @param datum - */ - protected void addToOutgoingQueue(StreamsDatum datum) throws InterruptedException{ - if(this.outQueues.size() == 1) { - outQueues.get(0).put(datum); - } - else { - List<BlockingQueue<StreamsDatum>> toOutput = Lists.newLinkedList(this.outQueues); - while(!toOutput.isEmpty()) { - for (BlockingQueue<StreamsDatum> queue : toOutput) { - StreamsDatum newDatum = cloneStreamsDatum(datum); - if (newDatum != null) { - if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) { - toOutput.remove(queue); - } - } - } + else { + List<BlockingQueue<StreamsDatum>> toOutput = Lists.newLinkedList(this.outQueues); + while(!toOutput.isEmpty()) { + for (BlockingQueue<StreamsDatum> queue : toOutput) { + StreamsDatum newDatum = cloneStreamsDatum(datum); + if (newDatum != null) { + if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) { + toOutput.remove(queue); } + } } + } } + } - @Override - public boolean isWaiting() { - if(this.inQueues == null || this.inQueues.size() == 0) { - return true; - } - boolean empty = true; - for(Queue queue : this.inQueues) { - empty = empty && queue.isEmpty(); - } - return empty; + @Override + public boolean isWaiting() { + if(this.inQueues == null || this.inQueues.size() == 0) { + return true; } - - /** - * //TODO LOCAL MODE HACK. Need to fix - * In order for our data streams to ported to other data flow frame works(Storm, Hadoop, Spark, etc) we need to be able to - * enforce the serialization required by each framework. This needs some thought and design before a final solution is - * made. - * - * In order to be able to copy/clone StreamDatums the orginal idea was to force all StreamsDatums to be java serializable. - * This was seen as unacceptable for local mode. So until we come up with a solution to enforce serialization and be - * compatiable across multiple frame works, this hack is in place. - * - * If datum.document is Serializable, we use serialization to clone a new copy. If it is not Serializable we attempt - * different methods using an com.fasterxml.jackson.databind.ObjectMapper to copy/clone the StreamsDatum. If the object - * is not clonable by these methods, an error is reported to the logging and a NULL object is returned. - * - * @param datum - * @return - */ - protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) { - try { - - if(datum.document instanceof ObjectNode) { - return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid)); - } - else if(datum.document instanceof Activity) { - - return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), - datum.timestamp, - datum.sequenceid)); - } + boolean empty = true; + for(Queue queue : this.inQueues) { + empty = empty && queue.isEmpty(); + } + return empty; + } + + /** + * //TODO LOCAL MODE HACK. Need to fix + * In order for our data streams to ported to other data flow frame works(Storm, Hadoop, Spark, etc) we need to be able to + * enforce the serialization required by each framework. This needs some thought and design before a final solution is + * made. + * + * In order to be able to copy/clone StreamDatums the orginal idea was to force all StreamsDatums to be java serializable. + * This was seen as unacceptable for local mode. So until we come up with a solution to enforce serialization and be + * compatiable across multiple frame works, this hack is in place. + * + * If datum.document is Serializable, we use serialization to clone a new copy. If it is not Serializable we attempt + * different methods using an com.fasterxml.jackson.databind.ObjectMapper to copy/clone the StreamsDatum. If the object + * is not clonable by these methods, an error is reported to the logging and a NULL object is returned. + * + * @param datum + * @return + */ + protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) { + try { + + if(datum.document instanceof ObjectNode) { + return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid)); + } + else if(datum.document instanceof Activity) { + + return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), + datum.timestamp, + datum.sequenceid)); + } // else if(this.mapper.canSerialize(datum.document.getClass())){ // return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()), // datum.timestamp, // datum.sequenceid); // } - else if(datum.document instanceof Serializable) { - return (StreamsDatum) SerializationUtil.cloneBySerialization(datum); - } - } catch (Exception e) { - LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e); - } - LOGGER.error("Failed to clone/copy StreamsDatum with document of class : {}", datum.document.getClass().getName()); - return null; + else if(datum.document instanceof Serializable) { + return (StreamsDatum) SerializationUtil.cloneBySerialization(datum); + } + } catch (Exception e) { + LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e); } - - private int getNextInputQueueIndex() { - ++this.inIndex; - if(this.inIndex >= this.inQueues.size()) { - this.inIndex = 0; - } - return this.inIndex; + LOGGER.error("Failed to clone/copy StreamsDatum with document of class : {}", datum.document.getClass().getName()); + return null; + } + + private int getNextInputQueueIndex() { + ++this.inIndex; + if(this.inIndex >= this.inQueues.size()) { + this.inIndex = 0; } - - private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) { - Map<String, Object> fromMeta = copyFrom.getMetadata(); - Map<String, Object> toMeta = copyTo.getMetadata(); - for(String key : fromMeta.keySet()) { - Object value = fromMeta.get(key); - if(value instanceof Serializable) - toMeta.put(key, SerializationUtil.cloneBySerialization(value)); - else //hope for the best - should be serializable - toMeta.put(key, value); - } - return copyTo; + return this.inIndex; + } + + private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) { + Map<String, Object> fromMeta = copyFrom.getMetadata(); + Map<String, Object> toMeta = copyTo.getMetadata(); + for(String key : fromMeta.keySet()) { + Object value = fromMeta.get(key); + if(value instanceof Serializable) + toMeta.put(key, SerializationUtil.cloneBySerialization(value)); + else //hope for the best - should be serializable + toMeta.put(key, value); } + return copyTo; + } - public long getStartedAt() { - return streamConfig.getStartedAt(); - } + public long getStartedAt() { + return streamConfig.getStartedAt(); + } - public void setStartedAt() { - streamConfig.setStartedAt(DateTime.now().getMillis()); - } + public void setStartedAt() { + streamConfig.setStartedAt(DateTime.now().getMillis()); + } - public String getStreamIdentifier() { - return streamConfig.getIdentifier(); - } + public String getStreamIdentifier() { + return streamConfig.getIdentifier(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java index e93ee1d..6df9767 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java @@ -28,67 +28,67 @@ import java.util.concurrent.Executor; @Deprecated public class LocalStreamProcessMonitorThread implements StatusCounterMonitorRunnable { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class); - private Executor executor; + private Executor executor; - private int seconds; + private int seconds; - private boolean run = true; + private boolean run = true; - public LocalStreamProcessMonitorThread(Executor executor, int delayInSeconds) { - this.executor = executor; - this.seconds = delayInSeconds; - } + public LocalStreamProcessMonitorThread(Executor executor, int delayInSeconds) { + this.executor = executor; + this.seconds = delayInSeconds; + } - @Override - public void shutdown(){ - this.run = false; - } + @Override + public void shutdown(){ + this.run = false; + } - @Override - public boolean isRunning() { - return this.run; - } + @Override + public boolean isRunning() { + return this.run; + } - @Override - public void run() - { - while(run){ - - /** - * - * Note: - * Quick class and method to let us see what is going on with the JVM. We need to make sure - * that everything is running with as little memory as possible. If we are generating a heap - * overflow, this will be very apparent by the information shown here. - */ - - MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - - String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? "NO_LIMIT" : - humanReadableByteCount(memoryUsage.getMax(), true); - - String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true); - - LOGGER.debug("[monitor] Used Memory: {}, Max: {}", - usedMemory, - maxMemory); - - try - { - Thread.sleep(seconds*1000); - } - catch (InterruptedException e) - { } - } - } + @Override + public void run() + { + while(run){ + + /** + * + * Note: + * Quick class and method to let us see what is going on with the JVM. We need to make sure + * that everything is running with as little memory as possible. If we are generating a heap + * overflow, this will be very apparent by the information shown here. + */ + + MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + + String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? "NO_LIMIT" : + humanReadableByteCount(memoryUsage.getMax(), true); + + String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true); + + LOGGER.debug("[monitor] Used Memory: {}, Max: {}", + usedMemory, + maxMemory); - public String humanReadableByteCount(long bytes, boolean si) { - int unit = si ? 1000 : 1024; - if (bytes < unit) return bytes + " B"; - int exp = (int) (Math.log(bytes) / Math.log(unit)); - String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i"); - return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + try + { + Thread.sleep(seconds*1000); + } + catch (InterruptedException e) + { } } + } + + public String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java index 5d4d8b5..9dc91d3 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java @@ -20,6 +20,6 @@ package org.apache.streams.local.tasks; @Deprecated public interface StatusCounterMonitorRunnable extends Runnable { - void shutdown(); - boolean isRunning(); + void shutdown(); + boolean isRunning(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java index c5413db..d59f0d6 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java @@ -19,60 +19,61 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.DatumStatusCountable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Deprecated public class StatusCounterMonitorThread implements StatusCounterMonitorRunnable { - private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class); + private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class); - private DatumStatusCountable task; + private DatumStatusCountable task; - private int seconds; + private int seconds; - private boolean run = true; + private boolean run = true; - public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) { - this.task = task; - this.seconds = delayInSeconds; - } + public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) { + this.task = task; + this.seconds = delayInSeconds; + } - @Override - public void shutdown() { - this.run = false; - } + @Override + public void shutdown() { + this.run = false; + } - @Override - public boolean isRunning() { - return this.run; - } + @Override + public boolean isRunning() { + return this.run; + } - @Override - public void run() { - while(run) { + @Override + public void run() { + while(run) { - /** - * - * Note: - * Quick class and method to let us see what is going on with the JVM. We need to make sure - * that everything is running with as little memory as possible. If we are generating a heap - * overflow, this will be very apparent by the information shown here. - */ + /** + * + * Note: + * Quick class and method to let us see what is going on with the JVM. We need to make sure + * that everything is running with as little memory as possible. If we are generating a heap + * overflow, this will be very apparent by the information shown here. + */ - LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} total", - task.getClass(), - task.getDatumStatusCounter().getAttempted(), - task.getDatumStatusCounter().getSuccess(), - task.getDatumStatusCounter().getPartial(), - task.getDatumStatusCounter().getFail(), - task.getDatumStatusCounter().getEmitted()); + LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} total", + task.getClass(), + task.getDatumStatusCounter().getAttempted(), + task.getDatumStatusCounter().getSuccess(), + task.getDatumStatusCounter().getPartial(), + task.getDatumStatusCounter().getFail(), + task.getDatumStatusCounter().getEmitted()); - try { - Thread.sleep(seconds*1000); - } - catch (InterruptedException e){ - shutdown(); - } - } + try { + Thread.sleep(seconds*1000); + } + catch (InterruptedException e){ + shutdown(); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java index 69cd5a5..473d2f4 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java @@ -21,9 +21,9 @@ package org.apache.streams.local.tasks; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.core.StreamsDatum; import org.apache.streams.local.counters.StreamsTaskCounter; + import org.apache.commons.lang.NotImplementedException; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -34,57 +34,57 @@ import java.util.concurrent.atomic.AtomicBoolean; @Deprecated public class StreamsMergeTask extends BaseStreamsTask { - private AtomicBoolean keepRunning; - private long sleepTime; + private AtomicBoolean keepRunning; + private long sleepTime; - public StreamsMergeTask() { - this(null); - } + public StreamsMergeTask() { + this(null); + } - public StreamsMergeTask(StreamsConfiguration streamConfig) { - super(streamConfig); - this.sleepTime = sleepTime; - this.keepRunning = new AtomicBoolean(true); - } + public StreamsMergeTask(StreamsConfiguration streamConfig) { + super(streamConfig); + this.sleepTime = sleepTime; + this.keepRunning = new AtomicBoolean(true); + } - @Override - public void stopTask() { - this.keepRunning.set(false); - } + @Override + public void stopTask() { + this.keepRunning.set(false); + } - @Override - public void setStreamConfig(StreamsConfiguration config) { + @Override + public void setStreamConfig(StreamsConfiguration config) { - } + } - @Override - public boolean isRunning() { - return false; - } + @Override + public boolean isRunning() { + return false; + } - @Override - public void run() { - while(this.keepRunning.get()) { - StreamsDatum datum = super.getNextDatum(); - if(datum != null) { - try { - super.addToOutgoingQueue(datum); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - else { - try { - Thread.sleep(this.sleepTime); - } catch (InterruptedException e) { - this.keepRunning.set(false); - } - } + @Override + public void run() { + while(this.keepRunning.get()) { + StreamsDatum datum = super.getNextDatum(); + if(datum != null) { + try { + super.addToOutgoingQueue(datum); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } + } + else { + try { + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + this.keepRunning.set(false); + } + } } + } - @Override - public void setStreamsTaskCounter(StreamsTaskCounter counter) { - throw new NotImplementedException(); - } + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + throw new NotImplementedException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index fb97218..5c918b2 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -18,15 +18,22 @@ package org.apache.streams.local.tasks; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.core.util.DatumUtils; import org.apache.streams.local.counters.StreamsTaskCounter; + +import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,136 +43,136 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumStatusCountable { - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterTask.class); - - private StreamsPersistWriter writer; - private AtomicBoolean keepRunning; - private StreamsConfiguration streamConfig; - private BlockingQueue<StreamsDatum> inQueue; - private AtomicBoolean isRunning; - private AtomicBoolean blocked; - private StreamsTaskCounter counter; - - private DatumStatusCounter statusCounter = new DatumStatusCounter(); - - @Override - public DatumStatusCounter getDatumStatusCounter() { - return this.statusCounter; - } - - - /** - * Default constructor. Uses default sleep of 500ms when inbound queue is empty. - * @param writer writer to execute in task - */ - public StreamsPersistWriterTask(StreamsPersistWriter writer) { - this(writer, null); - } - - /** - * - * @param writer writer to execute in task - * @param streamConfig stream config - */ - public StreamsPersistWriterTask(StreamsPersistWriter writer, StreamsConfiguration streamConfig) { - super(streamConfig); - this.streamConfig = super.streamConfig; - this.writer = writer; - this.keepRunning = new AtomicBoolean(true); - this.isRunning = new AtomicBoolean(true); - this.blocked = new AtomicBoolean(false); - } - - @Override - public boolean isWaiting() { - return this.inQueue.isEmpty() && this.blocked.get(); - } - - @Override - public void setStreamConfig(StreamsConfiguration config) { - this.streamConfig = config; - } - - @Override - public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { - this.inQueue = inputQueue; - } - - @Override - public boolean isRunning() { - return this.isRunning.get(); - } - - @Override - public void run() { + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterTask.class); + + private StreamsPersistWriter writer; + private AtomicBoolean keepRunning; + private StreamsConfiguration streamConfig; + private BlockingQueue<StreamsDatum> inQueue; + private AtomicBoolean isRunning; + private AtomicBoolean blocked; + private StreamsTaskCounter counter; + + private DatumStatusCounter statusCounter = new DatumStatusCounter(); + + @Override + public DatumStatusCounter getDatumStatusCounter() { + return this.statusCounter; + } + + + /** + * Default constructor. Uses default sleep of 500ms when inbound queue is empty. + * @param writer writer to execute in task + */ + public StreamsPersistWriterTask(StreamsPersistWriter writer) { + this(writer, null); + } + + /** + * + * @param writer writer to execute in task + * @param streamConfig stream config + */ + public StreamsPersistWriterTask(StreamsPersistWriter writer, StreamsConfiguration streamConfig) { + super(streamConfig); + this.streamConfig = super.streamConfig; + this.writer = writer; + this.keepRunning = new AtomicBoolean(true); + this.isRunning = new AtomicBoolean(true); + this.blocked = new AtomicBoolean(false); + } + + @Override + public boolean isWaiting() { + return this.inQueue.isEmpty() && this.blocked.get(); + } + + @Override + public void setStreamConfig(StreamsConfiguration config) { + this.streamConfig = config; + } + + @Override + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { + this.inQueue = inputQueue; + } + + @Override + public boolean isRunning() { + return this.isRunning.get(); + } + + @Override + public void run() { + try { + this.writer.prepare(this.streamConfig); + if(this.counter == null) { + this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); + } + while(this.keepRunning.get()) { + StreamsDatum datum = null; try { - this.writer.prepare(this.streamConfig); - if(this.counter == null) { - this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); - } - while(this.keepRunning.get()) { - StreamsDatum datum = null; - try { - this.blocked.set(true); - datum = this.inQueue.poll(5, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status."); - this.keepRunning.set(false); - if(!this.inQueue.isEmpty()) { - LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.writer.getClass().getName()); - } - Thread.currentThread().interrupt(); - } finally { - this.blocked.set(false); - } - if(datum != null) { - this.counter.incrementReceivedCount(); - try { - long startTime = System.currentTimeMillis(); - this.writer.write(datum); - this.counter.addTime(System.currentTimeMillis() - startTime); - statusCounter.incrementStatus(DatumStatus.SUCCESS); - } catch (Exception e) { - LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); - this.keepRunning.set(false); // why do we shutdown on a failed write ? - statusCounter.incrementStatus(DatumStatus.FAIL); - DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass()); - this.counter.incrementErrorCount(); - } - } else { //datums should never be null - LOGGER.trace("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName()); - } - } - Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - } catch(Throwable e) { - LOGGER.error("Caught Throwable in Persist Writer {} : {}", this.writer.getClass().getSimpleName(), e); + this.blocked.set(true); + datum = this.inQueue.poll(5, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status."); + this.keepRunning.set(false); + if(!this.inQueue.isEmpty()) { + LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.writer.getClass().getName()); + } + Thread.currentThread().interrupt(); } finally { - Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - this.writer.cleanUp(); - this.isRunning.set(false); + this.blocked.set(false); } + if(datum != null) { + this.counter.incrementReceivedCount(); + try { + long startTime = System.currentTimeMillis(); + this.writer.write(datum); + this.counter.addTime(System.currentTimeMillis() - startTime); + statusCounter.incrementStatus(DatumStatus.SUCCESS); + } catch (Exception e) { + LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); + this.keepRunning.set(false); // why do we shutdown on a failed write ? + statusCounter.incrementStatus(DatumStatus.FAIL); + DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass()); + this.counter.incrementErrorCount(); + } + } else { //datums should never be null + LOGGER.trace("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName()); + } + } + Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } catch(Throwable e) { + LOGGER.error("Caught Throwable in Persist Writer {} : {}", this.writer.getClass().getSimpleName(), e); + } finally { + Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + this.writer.cleanUp(); + this.isRunning.set(false); } - - @Override - public void stopTask() { - this.keepRunning.set(false); - } - - - @Override - public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { - throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()"); - } - - @Override - public List<BlockingQueue<StreamsDatum>> getInputQueues() { - List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>(); - queues.add(this.inQueue); - return queues; - } - - @Override - public void setStreamsTaskCounter(StreamsTaskCounter counter) { - this.counter = counter; - } + } + + @Override + public void stopTask() { + this.keepRunning.set(false); + } + + + @Override + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { + throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()"); + } + + @Override + public List<BlockingQueue<StreamsDatum>> getInputQueues() { + List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>(); + queues.add(this.inQueue); + return queues; + } + + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index 137c7e1..8720c68 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -18,15 +18,21 @@ package org.apache.streams.local.tasks; -import com.google.common.collect.Maps; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; import org.apache.streams.core.util.DatumUtils; import org.apache.streams.local.counters.StreamsTaskCounter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,139 +42,139 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatusCountable { - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorTask.class); - - - private StreamsProcessor processor; - private AtomicBoolean keepRunning; - private StreamsConfiguration streamConfig; - private BlockingQueue<StreamsDatum> inQueue; - private AtomicBoolean isRunning; - private AtomicBoolean blocked; - private StreamsTaskCounter counter; - - private DatumStatusCounter statusCounter = new DatumStatusCounter(); - - @Override - public DatumStatusCounter getDatumStatusCounter() { - return this.statusCounter; - } - - /** - * Default constructor, uses default sleep time of 500ms when inbound queue is empty - * @param processor process to run in task - */ - public StreamsProcessorTask(StreamsProcessor processor) { - this(processor, new StreamsConfiguration()); - } - - /** - * @param processor - * @param streamConfig - */ - public StreamsProcessorTask(StreamsProcessor processor, StreamsConfiguration streamConfig) { - super(streamConfig); - this.streamConfig = super.streamConfig; - this.processor = processor; - this.keepRunning = new AtomicBoolean(true); - this.isRunning = new AtomicBoolean(true); - this.blocked = new AtomicBoolean(true); - } - - @Override - public boolean isWaiting() { - return this.inQueue.isEmpty() && this.blocked.get(); - } - - @Override - public void stopTask() { - this.keepRunning.set(false); - } - - @Override - public void setStreamConfig(StreamsConfiguration config) { - this.streamConfig = config; - } - - @Override - public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { - this.inQueue = inputQueue; - } - - @Override - public boolean isRunning() { - return this.isRunning.get(); - } - - @Override - public void run() { + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorTask.class); + + + private StreamsProcessor processor; + private AtomicBoolean keepRunning; + private StreamsConfiguration streamConfig; + private BlockingQueue<StreamsDatum> inQueue; + private AtomicBoolean isRunning; + private AtomicBoolean blocked; + private StreamsTaskCounter counter; + + private DatumStatusCounter statusCounter = new DatumStatusCounter(); + + @Override + public DatumStatusCounter getDatumStatusCounter() { + return this.statusCounter; + } + + /** + * Default constructor, uses default sleep time of 500ms when inbound queue is empty + * @param processor process to run in task + */ + public StreamsProcessorTask(StreamsProcessor processor) { + this(processor, new StreamsConfiguration()); + } + + /** + * @param processor + * @param streamConfig + */ + public StreamsProcessorTask(StreamsProcessor processor, StreamsConfiguration streamConfig) { + super(streamConfig); + this.streamConfig = super.streamConfig; + this.processor = processor; + this.keepRunning = new AtomicBoolean(true); + this.isRunning = new AtomicBoolean(true); + this.blocked = new AtomicBoolean(true); + } + + @Override + public boolean isWaiting() { + return this.inQueue.isEmpty() && this.blocked.get(); + } + + @Override + public void stopTask() { + this.keepRunning.set(false); + } + + @Override + public void setStreamConfig(StreamsConfiguration config) { + this.streamConfig = config; + } + + @Override + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { + this.inQueue = inputQueue; + } + + @Override + public boolean isRunning() { + return this.isRunning.get(); + } + + @Override + public void run() { + try { + this.processor.prepare(this.streamConfig); + if(this.counter == null) { + this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); + } + while(this.keepRunning.get()) { + StreamsDatum datum = null; try { - this.processor.prepare(this.streamConfig); - if(this.counter == null) { - this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); - } - while(this.keepRunning.get()) { - StreamsDatum datum = null; - try { - this.blocked.set(true); - datum = this.inQueue.poll(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - } catch (InterruptedException ie) { - LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status."); - this.keepRunning.set(false); - if(!this.inQueue.isEmpty()) { - LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.processor.getClass().getName()); - } - Thread.currentThread().interrupt(); - } finally { - this.blocked.set(false); - } - if(datum != null) { - this.counter.incrementReceivedCount(); - try { - long startTime = System.currentTimeMillis(); - List<StreamsDatum> output = this.processor.process(datum); - this.counter.addTime(System.currentTimeMillis() - startTime); - if(output != null) { - for(StreamsDatum outDatum : output) { - super.addToOutgoingQueue(outDatum); - this.counter.incrementEmittedCount(); - statusCounter.incrementStatus(DatumStatus.SUCCESS); - } - } - } catch (InterruptedException ie) { - LOGGER.warn("Received InterruptedException, shutting down and re-applying interrupt status."); - this.keepRunning.set(false); - Thread.currentThread().interrupt(); - } catch (Throwable t) { - this.counter.incrementErrorCount(); - LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t); - statusCounter.incrementStatus(DatumStatus.FAIL); - //Add the error to the metadata, but keep processing - DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass()); - } - } else { - LOGGER.trace("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName()); - } - } - } catch(Throwable e) { - LOGGER.error("Caught Throwable in Processor {}", this.processor.getClass().getSimpleName(), e); + this.blocked.set(true); + datum = this.inQueue.poll(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status."); + this.keepRunning.set(false); + if(!this.inQueue.isEmpty()) { + LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.processor.getClass().getName()); + } + Thread.currentThread().interrupt(); } finally { - this.isRunning.set(false); - this.processor.cleanUp(); + this.blocked.set(false); } + if(datum != null) { + this.counter.incrementReceivedCount(); + try { + long startTime = System.currentTimeMillis(); + List<StreamsDatum> output = this.processor.process(datum); + this.counter.addTime(System.currentTimeMillis() - startTime); + if(output != null) { + for(StreamsDatum outDatum : output) { + super.addToOutgoingQueue(outDatum); + this.counter.incrementEmittedCount(); + statusCounter.incrementStatus(DatumStatus.SUCCESS); + } + } + } catch (InterruptedException ie) { + LOGGER.warn("Received InterruptedException, shutting down and re-applying interrupt status."); + this.keepRunning.set(false); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + this.counter.incrementErrorCount(); + LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t); + statusCounter.incrementStatus(DatumStatus.FAIL); + //Add the error to the metadata, but keep processing + DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass()); + } + } else { + LOGGER.trace("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName()); + } + } + } catch(Throwable e) { + LOGGER.error("Caught Throwable in Processor {}", this.processor.getClass().getSimpleName(), e); + } finally { + this.isRunning.set(false); + this.processor.cleanUp(); } - - @Override - public List<BlockingQueue<StreamsDatum>> getInputQueues() { - List<BlockingQueue<StreamsDatum>> queues = new LinkedList<BlockingQueue<StreamsDatum>>(); - queues.add(this.inQueue); - return queues; - } - - @Override - public void setStreamsTaskCounter(StreamsTaskCounter counter) { - this.counter = counter; - } + } + + @Override + public List<BlockingQueue<StreamsDatum>> getInputQueues() { + List<BlockingQueue<StreamsDatum>> queues = new LinkedList<BlockingQueue<StreamsDatum>>(); + queues.add(this.inQueue); + return queues; + } + + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } }