[05/53] [abbrv] git commit: adding platform-level status counters debugging data leak

2014-04-17 Thread sblackmon
adding platform-level status counters
debugging data leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab

Branch: refs/heads/master
Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7
Parents: adb43b2
Author: Steve Blackmon sblack...@w2odigital.com
Authored: Mon Mar 24 15:21:14 2014 -0500
Committer: Steve Blackmon sblack...@w2odigital.com
Committed: Mon Mar 24 15:21:14 2014 -0500

--
 .../streams/hdfs/WebHdfsPersistReader.java  | 33 
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 23 +-
 .../streams-provider-twitter/pom.xml|  2 +-
 .../provider/TwitterStreamConfigurator.java |  2 ++
 .../twitter/provider/TwitterStreamProvider.java | 22 ++---
 .../com/twitter/TwitterStreamConfiguration.json | 22 +
 .../src/main/resources/reference.conf   |  3 +-
 .../apache/streams/core/DatumStatusCounter.java | 25 ++-
 streams-runtimes/streams-runtime-local/pom.xml  | 17 ++
 .../local/builders/LocalStreamBuilder.java  | 25 +--
 .../streams/local/builders/StreamComponent.java | 24 +++---
 .../streams/local/tasks/BaseStreamsTask.java|  1 +
 .../tasks/LocalStreamProcessMonitorThread.java  |  2 +-
 .../local/tasks/StreamsProviderTask.java| 16 +-
 14 files changed, 170 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
--
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index b0d9904..3a6ff29 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,8 +1,6 @@
 package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,7 +19,6 @@ import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Created by sblackmon on 2/28/14.
  */
-public class WebHdfsPersistReader implements StreamsPersistReader {
+public class WebHdfsPersistReader implements StreamsPersistReader, 
DatumStatusCountable {
 
 public final static String STREAMS_ID = WebHdfsPersistReader;
 
@@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader {
 
 private ExecutorService executor;
 
+protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
 public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
 this.hdfsConfiguration = hdfsConfiguration;
 }
@@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader {
 } catch (IOException e) {
 e.printStackTrace();
 }
-persistQueue = new LinkedBlockingQueueStreamsDatum(1);
+persistQueue = Queues.synchronizedQueue(new 
LinkedBlockingQueueStreamsDatum(1));
+//persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
 executor = Executors.newSingleThreadExecutor();
 }
 
@@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader {
 @Override
 public StreamsResultSet readCurrent() {
 
-

git commit: adding platform-level status counters debugging data leak

2014-03-24 Thread sblackmon
Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning adb43b295 - ab5165ab7


adding platform-level status counters
debugging data leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab

Branch: refs/heads/springcleaning
Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7
Parents: adb43b2
Author: Steve Blackmon sblack...@w2odigital.com
Authored: Mon Mar 24 15:21:14 2014 -0500
Committer: Steve Blackmon sblack...@w2odigital.com
Committed: Mon Mar 24 15:21:14 2014 -0500

--
 .../streams/hdfs/WebHdfsPersistReader.java  | 33 
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 23 +-
 .../streams-provider-twitter/pom.xml|  2 +-
 .../provider/TwitterStreamConfigurator.java |  2 ++
 .../twitter/provider/TwitterStreamProvider.java | 22 ++---
 .../com/twitter/TwitterStreamConfiguration.json | 22 +
 .../src/main/resources/reference.conf   |  3 +-
 .../apache/streams/core/DatumStatusCounter.java | 25 ++-
 streams-runtimes/streams-runtime-local/pom.xml  | 17 ++
 .../local/builders/LocalStreamBuilder.java  | 25 +--
 .../streams/local/builders/StreamComponent.java | 24 +++---
 .../streams/local/tasks/BaseStreamsTask.java|  1 +
 .../tasks/LocalStreamProcessMonitorThread.java  |  2 +-
 .../local/tasks/StreamsProviderTask.java| 16 +-
 14 files changed, 170 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
--
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index b0d9904..3a6ff29 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,8 +1,6 @@
 package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,7 +19,6 @@ import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Created by sblackmon on 2/28/14.
  */
-public class WebHdfsPersistReader implements StreamsPersistReader {
+public class WebHdfsPersistReader implements StreamsPersistReader, 
DatumStatusCountable {
 
 public final static String STREAMS_ID = WebHdfsPersistReader;
 
@@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader {
 
 private ExecutorService executor;
 
+protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
 public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
 this.hdfsConfiguration = hdfsConfiguration;
 }
@@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader {
 } catch (IOException e) {
 e.printStackTrace();
 }
-persistQueue = new LinkedBlockingQueueStreamsDatum(1);
+persistQueue = Queues.synchronizedQueue(new 
LinkedBlockingQueueStreamsDatum(1));
+//persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
 executor = Executors.newSingleThreadExecutor();
 }
 
@@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements