Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-252
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c466a2c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c466a2c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c466a2c5 Branch: refs/heads/master Commit: c466a2c532c73ec0e09c6639c43b4c601964ea75 Parents: 9b9200e 536f85d Author: Robert Douglas <rdoug...@w2ogroup.com> Authored: Thu Dec 11 16:37:02 2014 -0600 Committer: Robert Douglas <rdoug...@w2ogroup.com> Committed: Thu Dec 11 16:37:02 2014 -0600 ---------------------------------------------------------------------- pom.xml | 21 +- streams-components/pom.xml | 1 + streams-components/streams-converters/pom.xml | 152 +++++++++++ .../converter/ActivityConverterProcessor.java | 262 +++++++++++++++++++ .../converter/BaseDocumentClassifier.java | 83 ++++++ .../BaseObjectNodeActivityConverter.java | 93 +++++++ .../converter/BaseStringActivityConverter.java | 93 +++++++ .../converter/TypeConverterProcessor.java | 91 +++++++ .../streams/converter/TypeConverterUtil.java | 52 ++++ ...ActivityConverterProcessorConfiguration.json | 23 ++ .../BaseActivityConverterProcessorTest.java | 92 +++++++ .../converter/test/CustomActivityConverter.java | 72 +++++ .../CustomActivityConverterProcessorTest.java | 103 ++++++++ .../test/CustomDocumentClassifier.java | 75 ++++++ .../streams/converter/test/CustomType.java | 37 +++ .../test/TypeConverterProcessorTest.java | 190 ++++++++++++++ .../org/apache/streams/s3/S3Configurator.java | 4 +- .../org/apache/streams/s3/S3PersistWriter.java | 38 +-- .../apache/streams/s3/S3PersistWriterTest.java | 62 +++++ .../apache/streams/core/util/DatumUtils.java | 12 + .../tasks/BroadcastMonitorThread.java | 3 + streams-pojo/pom.xml | 19 ++ .../apache/streams/data/ActivityConverter.java | 87 ++++++ .../apache/streams/data/ActivitySerializer.java | 4 +- .../apache/streams/data/DocumentClassifier.java | 39 +++ .../apache/streams/data/util/ActivityUtil.java | 7 + .../exceptions/ActivityConversionException.java | 42 +++ .../jackson/StreamsDateTimeDeserializer.java | 10 +- .../streams/jackson/StreamsDateTimeFormat.java | 12 + .../jackson/StreamsDateTimeSerializer.java | 5 +- .../streams/jackson/StreamsJacksonMapper.java | 20 +- .../streams/jackson/StreamsJacksonModule.java | 35 ++- .../data/data/util/CustomDateTimeFormat.java | 14 + .../data/util/CustomDateTimeFormatTest.java | 61 +++++ .../data/data/util/DateTimeSerDeTest.java | 3 +- .../streams/data/data/util/JsonUtilTest.java | 31 --- .../data/data/util/RFC3339UtilsTest.java | 3 + .../streams/local/builders/StreamComponent.java | 13 + .../local/tasks/StreamsPersistWriterTask.java | 3 + .../local/tasks/StreamsProcessorTask.java | 3 + 40 files changed, 1912 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --cc streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index 6f6dda5,6f6dda5..ea7edd1 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@@ -123,8 -123,8 +123,11 @@@ public class BroadcastMonitorThread ext Thread.sleep(waitTime); } catch (InterruptedException e) { LOGGER.error("Interrupted!: {}", e); ++ Thread.currentThread().interrupt(); ++ this.keepRunning = false; } catch (Exception e) { LOGGER.error("Exception: {}", e); ++ this.keepRunning = false; } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 050e297,050e297..5d75368 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@@ -114,6 -114,6 +114,9 @@@ public class StreamsPersistWriterTask e } catch (InterruptedException ie) { LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status."); this.keepRunning.set(false); ++ if(!this.inQueue.isEmpty()) { ++ LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.writer.getClass().getName()); ++ } Thread.currentThread().interrupt(); } finally { this.blocked.set(false); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index 2ec6336,2ec6336..d03dfa3 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@@ -123,6 -123,6 +123,9 @@@ public class StreamsProcessorTask exten } catch (InterruptedException ie) { LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status."); this.keepRunning.set(false); ++ if(!this.inQueue.isEmpty()) { ++ LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.processor.getClass().getName()); ++ } Thread.currentThread().interrupt(); } finally { this.blocked.set(false);