Merge branch 'master' of 
https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-252


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c466a2c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c466a2c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c466a2c5

Branch: refs/heads/master
Commit: c466a2c532c73ec0e09c6639c43b4c601964ea75
Parents: 9b9200e 536f85d
Author: Robert Douglas <rdoug...@w2ogroup.com>
Authored: Thu Dec 11 16:37:02 2014 -0600
Committer: Robert Douglas <rdoug...@w2ogroup.com>
Committed: Thu Dec 11 16:37:02 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  21 +-
 streams-components/pom.xml                      |   1 +
 streams-components/streams-converters/pom.xml   | 152 +++++++++++
 .../converter/ActivityConverterProcessor.java   | 262 +++++++++++++++++++
 .../converter/BaseDocumentClassifier.java       |  83 ++++++
 .../BaseObjectNodeActivityConverter.java        |  93 +++++++
 .../converter/BaseStringActivityConverter.java  |  93 +++++++
 .../converter/TypeConverterProcessor.java       |  91 +++++++
 .../streams/converter/TypeConverterUtil.java    |  52 ++++
 ...ActivityConverterProcessorConfiguration.json |  23 ++
 .../BaseActivityConverterProcessorTest.java     |  92 +++++++
 .../converter/test/CustomActivityConverter.java |  72 +++++
 .../CustomActivityConverterProcessorTest.java   | 103 ++++++++
 .../test/CustomDocumentClassifier.java          |  75 ++++++
 .../streams/converter/test/CustomType.java      |  37 +++
 .../test/TypeConverterProcessorTest.java        | 190 ++++++++++++++
 .../org/apache/streams/s3/S3Configurator.java   |   4 +-
 .../org/apache/streams/s3/S3PersistWriter.java  |  38 +--
 .../apache/streams/s3/S3PersistWriterTest.java  |  62 +++++
 .../apache/streams/core/util/DatumUtils.java    |  12 +
 .../tasks/BroadcastMonitorThread.java           |   3 +
 streams-pojo/pom.xml                            |  19 ++
 .../apache/streams/data/ActivityConverter.java  |  87 ++++++
 .../apache/streams/data/ActivitySerializer.java |   4 +-
 .../apache/streams/data/DocumentClassifier.java |  39 +++
 .../apache/streams/data/util/ActivityUtil.java  |   7 +
 .../exceptions/ActivityConversionException.java |  42 +++
 .../jackson/StreamsDateTimeDeserializer.java    |  10 +-
 .../streams/jackson/StreamsDateTimeFormat.java  |  12 +
 .../jackson/StreamsDateTimeSerializer.java      |   5 +-
 .../streams/jackson/StreamsJacksonMapper.java   |  20 +-
 .../streams/jackson/StreamsJacksonModule.java   |  35 ++-
 .../data/data/util/CustomDateTimeFormat.java    |  14 +
 .../data/util/CustomDateTimeFormatTest.java     |  61 +++++
 .../data/data/util/DateTimeSerDeTest.java       |   3 +-
 .../streams/data/data/util/JsonUtilTest.java    |  31 ---
 .../data/data/util/RFC3339UtilsTest.java        |   3 +
 .../streams/local/builders/StreamComponent.java |  13 +
 .../local/tasks/StreamsPersistWriterTask.java   |   3 +
 .../local/tasks/StreamsProcessorTask.java       |   3 +
 40 files changed, 1912 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --cc 
streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 6f6dda5,6f6dda5..ea7edd1
--- 
a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ 
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@@ -123,8 -123,8 +123,11 @@@ public class BroadcastMonitorThread ext
                  Thread.sleep(waitTime);
              } catch (InterruptedException e) {
                  LOGGER.error("Interrupted!: {}", e);
++                Thread.currentThread().interrupt();
++                this.keepRunning = false;
              } catch (Exception e) {
                  LOGGER.error("Exception: {}", e);
++                this.keepRunning = false;
              }
          }
      }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --cc 
streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 050e297,050e297..5d75368
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@@ -114,6 -114,6 +114,9 @@@ public class StreamsPersistWriterTask e
                  } catch (InterruptedException ie) {
                      LOGGER.debug("Received InterruptedException. Shutting 
down and re-applying interrupt status.");
                      this.keepRunning.set(false);
++                    if(!this.inQueue.isEmpty()) {
++                        LOGGER.error("Received InteruptedException and input 
queue still has data, count={}, processor={}",this.inQueue.size(), 
this.writer.getClass().getName());
++                    }
                      Thread.currentThread().interrupt();
                  } finally {
                      this.blocked.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c466a2c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --cc 
streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 2ec6336,2ec6336..d03dfa3
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@@ -123,6 -123,6 +123,9 @@@ public class StreamsProcessorTask exten
                  } catch (InterruptedException ie) {
                      LOGGER.debug("Received InteruptedException, shutting down 
and re-applying interrupt status.");
                      this.keepRunning.set(false);
++                    if(!this.inQueue.isEmpty()) {
++                        LOGGER.error("Received InteruptedException and input 
queue still has data, count={}, processor={}",this.inQueue.size(), 
this.processor.getClass().getName());
++                    }
                      Thread.currentThread().interrupt();
                  } finally {
                      this.blocked.set(false);

Reply via email to