[05/53] [abbrv] git commit: adding platform-level status counters debugging data leak
adding platform-level status counters debugging data leak Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab Branch: refs/heads/master Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7 Parents: adb43b2 Author: Steve Blackmon sblack...@w2odigital.com Authored: Mon Mar 24 15:21:14 2014 -0500 Committer: Steve Blackmon sblack...@w2odigital.com Committed: Mon Mar 24 15:21:14 2014 -0500 -- .../streams/hdfs/WebHdfsPersistReader.java | 33 .../streams/hdfs/WebHdfsPersistReaderTask.java | 23 +- .../streams-provider-twitter/pom.xml| 2 +- .../provider/TwitterStreamConfigurator.java | 2 ++ .../twitter/provider/TwitterStreamProvider.java | 22 ++--- .../com/twitter/TwitterStreamConfiguration.json | 22 + .../src/main/resources/reference.conf | 3 +- .../apache/streams/core/DatumStatusCounter.java | 25 ++- streams-runtimes/streams-runtime-local/pom.xml | 17 ++ .../local/builders/LocalStreamBuilder.java | 25 +-- .../streams/local/builders/StreamComponent.java | 24 +++--- .../streams/local/tasks/BaseStreamsTask.java| 1 + .../tasks/LocalStreamProcessMonitorThread.java | 2 +- .../local/tasks/StreamsProviderTask.java| 16 +- 14 files changed, 170 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java -- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index b0d9904..3a6ff29 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -1,8 +1,6 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +19,6 @@ import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.Collection; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** * Created by sblackmon on 2/28/14. */ -public class WebHdfsPersistReader implements StreamsPersistReader { +public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable { public final static String STREAMS_ID = WebHdfsPersistReader; @@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader { private ExecutorService executor; +protected DatumStatusCounter countersTotal = new DatumStatusCounter(); +protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) { this.hdfsConfiguration = hdfsConfiguration; } @@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader { } catch (IOException e) { e.printStackTrace(); } -persistQueue = new LinkedBlockingQueueStreamsDatum(1); +persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueueStreamsDatum(1)); +//persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); executor = Executors.newSingleThreadExecutor(); } @@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements StreamsPersistReader { @Override public StreamsResultSet readCurrent() { -
git commit: adding platform-level status counters debugging data leak
Repository: incubator-streams Updated Branches: refs/heads/springcleaning adb43b295 - ab5165ab7 adding platform-level status counters debugging data leak Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab Branch: refs/heads/springcleaning Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7 Parents: adb43b2 Author: Steve Blackmon sblack...@w2odigital.com Authored: Mon Mar 24 15:21:14 2014 -0500 Committer: Steve Blackmon sblack...@w2odigital.com Committed: Mon Mar 24 15:21:14 2014 -0500 -- .../streams/hdfs/WebHdfsPersistReader.java | 33 .../streams/hdfs/WebHdfsPersistReaderTask.java | 23 +- .../streams-provider-twitter/pom.xml| 2 +- .../provider/TwitterStreamConfigurator.java | 2 ++ .../twitter/provider/TwitterStreamProvider.java | 22 ++--- .../com/twitter/TwitterStreamConfiguration.json | 22 + .../src/main/resources/reference.conf | 3 +- .../apache/streams/core/DatumStatusCounter.java | 25 ++- streams-runtimes/streams-runtime-local/pom.xml | 17 ++ .../local/builders/LocalStreamBuilder.java | 25 +-- .../streams/local/builders/StreamComponent.java | 24 +++--- .../streams/local/tasks/BaseStreamsTask.java| 1 + .../tasks/LocalStreamProcessMonitorThread.java | 2 +- .../local/tasks/StreamsProviderTask.java| 16 +- 14 files changed, 170 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java -- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index b0d9904..3a6ff29 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -1,8 +1,6 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +19,6 @@ import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.Collection; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** * Created by sblackmon on 2/28/14. */ -public class WebHdfsPersistReader implements StreamsPersistReader { +public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable { public final static String STREAMS_ID = WebHdfsPersistReader; @@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader { private ExecutorService executor; +protected DatumStatusCounter countersTotal = new DatumStatusCounter(); +protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) { this.hdfsConfiguration = hdfsConfiguration; } @@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader { } catch (IOException e) { e.printStackTrace(); } -persistQueue = new LinkedBlockingQueueStreamsDatum(1); +persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueueStreamsDatum(1)); +//persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); executor = Executors.newSingleThreadExecutor(); } @@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements