http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
index 17fde37..91d487e 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
@@ -18,6 +18,9 @@
 
 package org.apache.streams.moreover;
 
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+
 import com.fasterxml.jackson.databind.AnnotationIntrospector;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -25,9 +28,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 import com.moreover.api.Article;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.moreover.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,64 +35,64 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * Deserializes Moreover JSON format into Activities
+ * Deserializes Moreover JSON format into Activities.
  */
 public class MoreoverJsonActivitySerializer implements 
ActivitySerializer<String> {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
 
-    public MoreoverJsonActivitySerializer() {
-    }
+  public MoreoverJsonActivitySerializer() {
+  }
 
-    @Override
-    public String serializationFormat() {
-        return "application/json+vnd.moreover.com.v1";
-    }
+  @Override
+  public String serializationFormat() {
+    return "application/json+vnd.moreover.com.v1";
+  }
 
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover JSON");
-    }
+  @Override
+  public String serialize(Activity deserialized) {
+    throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover JSON");
+  }
 
-    @Override
-    public Activity deserialize(String serialized) {
-        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
-        LOGGER.debug(serialized);
-
-        ObjectMapper mapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new 
JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
-        Article article;
-        try {
-            ObjectNode node = (ObjectNode)mapper.readTree(serialized);
-            node.remove("tags");
-            node.remove("locations");
-            node.remove("companies");
-            node.remove("topics");
-            node.remove("media");
-            node.remove("outboundUrls");
-            ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
-            jsonNodes.remove("editorialTopics");
-            jsonNodes.remove("tags");
-            jsonNodes.remove("autoTopics");
-            article = mapper.convertValue(node, Article.class);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-        return MoreoverUtils.convert(article);
-    }
+  @Override
+  public Activity deserialize(String serialized) {
+    serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
 
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException("Not currently implemented");
+    LOGGER.debug(serialized);
+
+    ObjectMapper mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new 
JaxbAnnotationIntrospector(mapper.getTypeFactory());
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
+    mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, 
Boolean.FALSE);
+    mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+    
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+    mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+    Article article;
+    try {
+      ObjectNode node = (ObjectNode)mapper.readTree(serialized);
+      node.remove("tags");
+      node.remove("locations");
+      node.remove("companies");
+      node.remove("topics");
+      node.remove("media");
+      node.remove("outboundUrls");
+      ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
+      jsonNodes.remove("editorialTopics");
+      jsonNodes.remove("tags");
+      jsonNodes.remove("autoTopics");
+      article = mapper.convertValue(node, Article.class);
+    } catch (IOException ex) {
+      throw new IllegalArgumentException("Unable to deserialize", ex);
     }
+    return MoreoverUtils.convert(article);
+  }
+
+  @Override
+  public List<Activity> deserializeAll(List<String> serializedList) {
+    throw new NotImplementedException("Not currently implemented");
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
index 78d8e9d..2ab6ee4 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -18,14 +18,6 @@
 
 package org.apache.streams.moreover;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
@@ -33,6 +25,17 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,136 +45,155 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Streams Provider for the Moreover Metabase API
- *
- *  To use from command line:
- *
- *  Supply configuration similar to src/test/resources/rss.conf
- *
- *  Launch using:
- *
- *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider 
-Dexec.args="rss.conf articles.json"
+ * Streams Provider for the Moreover Metabase API.
  */
 public class MoreoverProvider implements StreamsProvider {
 
-    public final static String STREAMS_ID = "MoreoverProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MoreoverProvider.class);
-
-    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+  public static final String STREAMS_ID = "MoreoverProvider";
 
-    private List<MoreoverKeyData> keys;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MoreoverProvider.class);
 
-    private MoreoverConfiguration config;
+  protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
 
-    private ExecutorService executor;
+  private List<MoreoverKeyData> keys;
 
-    public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
-        this.config = moreoverConfiguration;
-        this.keys = Lists.newArrayList();
-        for( MoreoverKeyData apiKey : config.getApiKeys()) {
-            this.keys.add(apiKey);
-        }
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    public void startStream() {
+  private MoreoverConfiguration config;
 
-        for(MoreoverKeyData key : keys) {
-            MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), 
key.getKey(), this.providerQueue, key.getStartingSequence());
-            executor.submit(new Thread(task));
-            LOGGER.info("Started producer for {}", key.getKey());
-        }
+  private ExecutorService executor;
 
+  /**
+   * MoreoverProvider constructor.
+   * @param moreoverConfiguration MoreoverConfiguration
+   */
+  public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
+    this.config = moreoverConfiguration;
+    this.keys = Lists.newArrayList();
+    for ( MoreoverKeyData apiKey : config.getApiKeys()) {
+      this.keys.add(apiKey);
     }
+  }
 
-    @Override
-    public synchronized StreamsResultSet readCurrent() {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        LOGGER.debug("readCurrent: {}", providerQueue.size());
-
-        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
-        Iterators.addAll(currentIterator, providerQueue.iterator());
-
-        StreamsResultSet current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
-
-        providerQueue.clear();
-
-        return current;
-    }
+  @Override
+  public void startStream() {
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
+    for (MoreoverKeyData key : keys) {
+      MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), 
key.getKey(), this.providerQueue, key.getStartingSequence());
+      executor.submit(new Thread(task));
+      LOGGER.info("Started producer for {}", key.getKey());
     }
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        LOGGER.debug("Prepare");
-        executor = Executors.newSingleThreadExecutor();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        MoreoverConfiguration config = new 
ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe,
 "rss");
-        MoreoverProvider provider = new MoreoverProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+  }
+
+  @Override
+  public synchronized StreamsResultSet readCurrent() {
+
+    LOGGER.debug("readCurrent: {}", providerQueue.size());
+
+    Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+    Iterators.addAll(currentIterator, providerQueue.iterator());
+
+    StreamsResultSet current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+
+    providerQueue.clear();
+
+    return current;
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isShutdown() && !executor.isTerminated();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    LOGGER.debug("Prepare");
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply configuration similar to src/test/resources/rss.conf
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider 
-Dexec.args="rss.conf articles.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    MoreoverConfiguration config = new 
ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe,
 "rss");
+    MoreoverProvider provider = new MoreoverProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
     }
+    while ( provider.isRunning() );
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
index ad92d73..88aec81 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
@@ -18,74 +18,84 @@
 
 package org.apache.streams.moreover;
 
-import com.google.common.collect.ImmutableSet;
 import org.apache.streams.core.StreamsDatum;
+
+import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
 
 /**
- * Task to pull from the Morever API
+ * Task that pulls from the Morever API on behalf of MoreoverProvider.
  */
 public class MoreoverProviderTask implements Runnable {
 
-    public static final int LATENCY = 10;
-    public static final int REQUIRED_LATENCY = LATENCY * 1000;
-    private static Logger logger = 
LoggerFactory.getLogger(MoreoverProviderTask.class);
-
-    private String lastSequence;
-    private final String apiKey;
-    private final String apiId;
-    private final Queue<StreamsDatum> results;
-    private final MoreoverClient moClient;
-    private boolean started = false;
+  public static final int LATENCY = 10;
+  public static final int REQUIRED_LATENCY = LATENCY * 1000;
+  private static Logger logger = 
LoggerFactory.getLogger(MoreoverProviderTask.class);
 
-    public MoreoverProviderTask(String apiId, String apiKey, 
Queue<StreamsDatum> results, String lastSequence) {
-        //logger.info("Constructed new task {} for {} {} {}", 
UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
-        this.apiId = apiId;
-        this.apiKey = apiKey;
-        this.results = results;
-        this.lastSequence = lastSequence;
-        this.moClient = new MoreoverClient(this.apiId, this.apiKey, 
this.lastSequence);
-        initializeClient(moClient);
-    }
+  private String lastSequence;
+  private final String apiKey;
+  private final String apiId;
+  private final Queue<StreamsDatum> results;
+  private final MoreoverClient moClient;
+  private boolean started = false;
 
-    @Override
-    public void run() {
-        while(true) {
-            try {
-                ensureTime(moClient);
-                MoreoverResult result = 
moClient.getArticlesAfter(lastSequence, 500);
-                started = true;
-                lastSequence = result.process().toString();
-                for(StreamsDatum entry : 
ImmutableSet.copyOf(result.iterator()))
-                    results.offer(entry);
-                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, 
lastSequence);
+  /**
+   * MoreoverProviderTask constructor.
+   * @param apiId apiId
+   * @param apiKey apiKey
+   * @param results results
+   * @param lastSequence lastSequence
+   */
+  public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> 
results, String lastSequence) {
+    //logger.info("Constructed new task {} for {} {} {}", 
UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
+    this.apiId = apiId;
+    this.apiKey = apiKey;
+    this.results = results;
+    this.lastSequence = lastSequence;
+    this.moClient = new MoreoverClient(this.apiId, this.apiKey, 
this.lastSequence);
+    initializeClient(moClient);
+  }
 
-            } catch (Exception e) {
-                logger.error("Exception while polling moreover", e);
-            }
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        ensureTime(moClient);
+        MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
+        started = true;
+        lastSequence = result.process().toString();
+        for (StreamsDatum entry : ImmutableSet.copyOf(result.iterator())) {
+          results.offer(entry);
         }
+        logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
+      } catch (Exception ex) {
+        logger.error("Exception while polling moreover", ex);
+      }
     }
+  }
 
-    private void ensureTime(MoreoverClient moClient) {
-        try {
-            long gap = System.currentTimeMillis() - moClient.pullTime;
-            if (gap < REQUIRED_LATENCY)
-                Thread.sleep(REQUIRED_LATENCY - gap);
-        } catch (Exception e) {
-            logger.warn("Error sleeping for latency");
-        }
+  private void ensureTime(MoreoverClient moClient) {
+    try {
+      long gap = System.currentTimeMillis() - moClient.pullTime;
+      if (gap < REQUIRED_LATENCY) {
+        Thread.sleep(REQUIRED_LATENCY - gap);
+      }
+    } catch (Exception ex) {
+      logger.warn("Error sleeping for latency");
     }
+  }
 
-    private void initializeClient(MoreoverClient moClient) {
-        try {
-            moClient.getArticlesAfter(this.lastSequence, 2);
-        } catch (Exception e) {
-            logger.error("Failed to start stream, {}", this.apiKey);
-            logger.error("Exception : ", e);
-            throw new IllegalStateException("Unable to initialize stream", e);
-        }
+  private void initializeClient(MoreoverClient moClient) {
+    try {
+      moClient.getArticlesAfter(this.lastSequence, 2);
+    } catch (Exception ex) {
+      logger.error("Failed to start stream, {}", this.apiKey);
+      logger.error("Exception : ", ex);
+      throw new IllegalStateException("Unable to initialize stream", ex);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
index e07084f..589e647 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.moreover;
 
+import org.apache.streams.core.StreamsDatum;
+
 import com.fasterxml.aalto.stax.InputFactoryImpl;
 import com.fasterxml.aalto.stax.OutputFactoryImpl;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -29,7 +31,6 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper;
 import com.google.common.collect.Lists;
 import com.moreover.api.Article;
 import com.moreover.api.ArticlesResponse;
-import org.apache.streams.core.StreamsDatum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,163 +39,163 @@ import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
 
-
 public class MoreoverResult implements Iterable<StreamsDatum> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(MoreoverResult.class);
-
-    private ObjectMapper mapper;
-    private XmlMapper xmlMapper;
-
-    private String xmlString;
-    private String jsonString;
-    private ArticlesResponse resultObject;
-    private ArticlesResponse.Articles articles;
-    private List<Article> articleArray;
-    private long start;
-    private long end;
-    private String clientId;
-    private BigInteger maxSequencedId = BigInteger.ZERO;
-
-    protected ArticlesResponse response;
-    protected List<StreamsDatum> list = Lists.newArrayList();
-
-    protected MoreoverResult(String clientId, String xmlString, long start, 
long end) {
-        this.xmlString = xmlString;
-        this.clientId = clientId;
-        this.start = start;
-        this.end = end;
-        XmlFactory f = new XmlFactory(new InputFactoryImpl(),
-                new OutputFactoryImpl());
-
-        JacksonXmlModule module = new JacksonXmlModule();
-
-        module.setDefaultUseWrapper(false);
-
-        xmlMapper = new XmlMapper(f, module);
-
-        xmlMapper
-                .configure(
-                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        
DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper.configure(
-                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
-                Boolean.TRUE);
-        xmlMapper.configure(
-                DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
-                Boolean.FALSE);
-
-        mapper = new ObjectMapper();
-
-        mapper
-                .configure(
-                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
-                        Boolean.TRUE);
-        mapper.configure(
-                DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
-                Boolean.TRUE);
-        mapper
-                .configure(
-                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
-                        Boolean.TRUE);
-        mapper.configure(
-                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
-                Boolean.TRUE);
-
+  private static final Logger logger = 
LoggerFactory.getLogger(MoreoverResult.class);
+
+  private ObjectMapper mapper;
+  private XmlMapper xmlMapper;
+
+  private String xmlString;
+  private String jsonString;
+  private ArticlesResponse resultObject;
+  private ArticlesResponse.Articles articles;
+  private List<Article> articleArray;
+  private long start;
+  private long end;
+  private String clientId;
+  private BigInteger maxSequencedId = BigInteger.ZERO;
+
+  protected ArticlesResponse response;
+  protected List<StreamsDatum> list = Lists.newArrayList();
+
+  protected MoreoverResult(String clientId, String xmlString, long start, long 
end) {
+    this.xmlString = xmlString;
+    this.clientId = clientId;
+    this.start = start;
+    this.end = end;
+    XmlFactory xmlFactory = new XmlFactory(new InputFactoryImpl(),
+        new OutputFactoryImpl());
+
+    JacksonXmlModule module = new JacksonXmlModule();
+
+    module.setDefaultUseWrapper(false);
+
+    xmlMapper = new XmlMapper(xmlFactory, module);
+
+    xmlMapper
+        .configure(
+            DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+            Boolean.TRUE);
+    xmlMapper
+        .configure(
+            DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+            Boolean.TRUE);
+    xmlMapper
+        .configure(
+            DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+            Boolean.TRUE);
+    xmlMapper.configure(
+        DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+        Boolean.TRUE);
+    xmlMapper.configure(
+        DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+        Boolean.FALSE);
+
+    mapper = new ObjectMapper();
+
+    mapper
+        .configure(
+            DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+            Boolean.TRUE);
+    mapper.configure(
+        DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+        Boolean.TRUE);
+    mapper
+        .configure(
+            DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+            Boolean.TRUE);
+    mapper.configure(
+        DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+        Boolean.TRUE);
+
+  }
+
+  public String getClientId() {
+    return clientId;
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public long getEnd() {
+    return end;
+  }
+
+  /**
+   * Process batch and
+   * @return max sequenceId.
+   */
+  public BigInteger process() {
+
+    try {
+      this.resultObject = xmlMapper.readValue(xmlString, 
ArticlesResponse.class);
+    } catch (JsonMappingException ex) {
+      // theory is this may not be fatal
+      this.resultObject = (ArticlesResponse) ex.getPath().get(0).getFrom();
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      logger.warn("Unable to process document:");
+      logger.warn(xmlString);
     }
 
-    public String getClientId() {
-        return clientId;
+    if ( this.resultObject.getStatus().equals("FAILURE")) {
+      logger.warn(this.resultObject.getStatus());
+      logger.warn(this.resultObject.getMessageCode());
+      logger.warn(this.resultObject.getUserMessage());
+      logger.warn(this.resultObject.getDeveloperMessage());
+    } else {
+      this.articles = resultObject.getArticles();
+      this.articleArray = articles.getArticle();
+
+      for (Article article : articleArray) {
+        BigInteger sequenceid = new BigInteger(article.getSequenceId());
+        list.add(new StreamsDatum(article, sequenceid));
+        logger.trace("Prior max sequence Id {} current candidate {}", 
this.maxSequencedId, sequenceid);
+        if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+          this.maxSequencedId = sequenceid;
+        }
+      }
     }
 
-    public long getStart() {
-        return start;
-    }
+    return this.maxSequencedId;
+  }
 
-    public long getEnd() {
-        return end;
-    }
+  public String getXmlString() {
+    return this.xmlString;
+  }
 
-    public BigInteger process() {
-
-        try {
-            this.resultObject = xmlMapper.readValue(xmlString, 
ArticlesResponse.class);
-        } catch (JsonMappingException e) {
-            // theory is this may not be fatal
-            this.resultObject = (ArticlesResponse) 
e.getPath().get(0).getFrom();
-        } catch (Exception e) {
-            e.printStackTrace();
-            logger.warn("Unable to process document:");
-            logger.warn(xmlString);
-        }
+  public BigInteger getMaxSequencedId() {
+    return this.maxSequencedId;
+  }
 
-        if( this.resultObject.getStatus().equals("FAILURE"))
-        {
-            logger.warn(this.resultObject.getStatus());
-            logger.warn(this.resultObject.getMessageCode());
-            logger.warn(this.resultObject.getUserMessage());
-            logger.warn(this.resultObject.getDeveloperMessage());
-        }
-        else
-        {
-            this.articles = resultObject.getArticles();
-            this.articleArray = articles.getArticle();
-
-            for (Article article : articleArray) {
-                BigInteger sequenceid = new 
BigInteger(article.getSequenceId());
-                list.add(new StreamsDatum(article, sequenceid));
-                logger.trace("Prior max sequence Id {} current candidate {}", 
this.maxSequencedId, sequenceid);
-                if (sequenceid.compareTo(this.maxSequencedId) > 0) {
-                    this.maxSequencedId = sequenceid;
-                }
-            }
-        }
+  @Override
+  public Iterator<StreamsDatum> iterator() {
+    return list.iterator();
+  }
 
-        return this.maxSequencedId;
-    }
+  protected static class JsonStringIterator implements Iterator<Serializable> {
 
-    public String getXmlString() {
-        return this.xmlString;
-    }
+    private Iterator<Serializable> underlying;
 
-    public BigInteger getMaxSequencedId() {
-        return this.maxSequencedId;
+    protected JsonStringIterator(Iterator<Serializable> underlying) {
+      this.underlying = underlying;
     }
 
     @Override
-    public Iterator<StreamsDatum> iterator() {
-        return list.iterator();
+    public boolean hasNext() {
+      return underlying.hasNext();
     }
 
-    protected static class JsonStringIterator implements 
Iterator<Serializable> {
-
-        private Iterator<Serializable> underlying;
-
-        protected JsonStringIterator(Iterator<Serializable> underlying) {
-            this.underlying = underlying;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return underlying.hasNext();
-        }
-
-        @Override
-        public String next() {
-            return underlying.next().toString();
-        }
+    @Override
+    public String next() {
+      return underlying.next().toString();
+    }
 
-        @Override
-        public void remove() {
-            underlying.remove();
-        }
+    @Override
+    public void remove() {
+      underlying.remove();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
deleted file mode 100644
index 0a47bd1..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.moreover;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-
-import java.util.Queue;
-
-public class MoreoverResultSetWrapper extends StreamsResultSet {
-
-    public MoreoverResultSetWrapper(MoreoverResult underlying) {
-        super((Queue<StreamsDatum>)underlying);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
index 8a91281..f9a3595 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
@@ -18,22 +18,19 @@
 
 package org.apache.streams.moreover;
 
-import com.moreover.api.Article;
-import com.moreover.api.Author;
-import com.moreover.api.AuthorPublishingPlatform;
-import com.moreover.api.Feed;
-import com.moreover.api.Source;
 import org.apache.streams.data.util.ActivityUtil;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Provider;
+
+import com.moreover.api.Article;
+import com.moreover.api.Author;
+import com.moreover.api.AuthorPublishingPlatform;
+import com.moreover.api.Feed;
+import com.moreover.api.Source;
 import org.joda.time.DateTime;
 
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -43,125 +40,156 @@ import static 
org.apache.streams.data.util.ActivityUtil.LANGUAGE_EXTENSION;
 import static org.apache.streams.data.util.ActivityUtil.LOCATION_EXTENSION;
 import static 
org.apache.streams.data.util.ActivityUtil.LOCATION_EXTENSION_COUNTRY;
 import static org.apache.streams.data.util.ActivityUtil.getObjectId;
-import static org.apache.streams.data.util.ActivityUtil.getProviderId;
 
 /**
- * Provides utilities for Moroever data
+ * Provides utilities for Moreover data.
  */
 public class MoreoverUtils {
-    private MoreoverUtils() {
-    }
-
-    public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
-
-    public static Activity convert(Article article) {
-        Activity activity = new Activity();
-        Source source = article.getSource();
-        activity.setActor(convert(article.getAuthor(), source.getName()));
-        activity.setProvider(convert(source));
-        activity.setTarget(convertTarget(source));
-        activity.setObject(convertObject(article));
-        activity.setPublished(DateTime.parse(article.getPublishedDate()));
-        activity.setContent(article.getContent());
-        activity.setTitle(article.getTitle());
-        activity.setVerb("posted");
-        fixActivityId(activity);
-        addLocationExtension(activity, source);
-        addLanguageExtension(activity, article);
-        activity.setLinks(convertLinks(article));
-        return activity;
-    }
 
-    private static void fixActivityId(Activity activity) {
-        if (activity.getId() != null && 
activity.getId().matches("\\{[a-z]*\\}")) {
-            activity.setId(null);
-        }
+  private MoreoverUtils() {
+  }
+
+  public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
+
+  /**
+   * convert article into Activity.
+   * @param article article
+   * @return Activity
+   */
+  public static Activity convert(Article article) {
+    Activity activity = new Activity();
+    Source source = article.getSource();
+    activity.setActor(convert(article.getAuthor(), source.getName()));
+    activity.setProvider(convert(source));
+    activity.setTarget(convertTarget(source));
+    activity.setObject(convertObject(article));
+    activity.setPublished(DateTime.parse(article.getPublishedDate()));
+    activity.setContent(article.getContent());
+    activity.setTitle(article.getTitle());
+    activity.setVerb("posted");
+    fixActivityId(activity);
+    addLocationExtension(activity, source);
+    addLanguageExtension(activity, article);
+    activity.setLinks(convertLinks(article));
+    return activity;
+  }
+
+  /**
+   * convert Source to Provider.
+   * @param source Source
+   * @return Provider
+   */
+  public static Provider convert(Source source) {
+    Provider provider = new Provider();
+    Feed feed = source.getFeed();
+    String display = getProviderId(feed);
+    
provider.setId(ActivityUtil.getProviderId(display.trim().toLowerCase().replace("
 ", "_")));
+    provider.setDisplayName(display);
+    provider.setUrl(feed.getUrl());
+    return provider;
+  }
+
+  /**
+   * convert Author and platformName to Actor.
+   * @param author Author
+   * @param platformName platformName
+   * @return $.actor
+   */
+  public static ActivityObject convert(Author author, String platformName) {
+    ActivityObject actor = new ActivityObject();
+    AuthorPublishingPlatform platform = author.getPublishingPlatform();
+    String userId = platform.getUserId();
+    if (userId != null) {
+      actor.setId(ActivityUtil.getPersonId(getProviderId(platformName), 
userId));
     }
-
-    private static List convertLinks(Article article) {
-        List<String> list = new LinkedList<>();
-        Article.OutboundUrls outboundUrls = article.getOutboundUrls();
-        if (outboundUrls != null) {
-            for (String url : outboundUrls.getOutboundUrl()) {
-                list.add(url);
-            }
-        }
-        return list;
-    }
-
-    public static ActivityObject convertTarget(Source source) {
-        ActivityObject object = new ActivityObject();
-        object.setUrl(source.getHomeUrl());
-        object.setDisplayName(source.getName());
-        return object;
-    }
-
-    public static ActivityObject convertObject(Article article) {
-        ActivityObject object = new ActivityObject();
-        object.setContent(article.getContent());
-        object.setSummary(article.getTitle());
-        object.setUrl(article.getOriginalUrl());
-        object.setObjectType(article.getDataFormat());
-        String type = article.getDataFormat().equals("text") ? "article" : 
article.getDataFormat();
-        object.setId(getObjectId(getProviderID(article.getSource().getFeed()), 
type, article.getId()));
-        object.setPublished(DateTime.parse(article.getPublishedDate()));
-        return object;
-    }
-
-    public static Provider convert(Source source) {
-        Provider provider = new Provider();
-        Feed feed = source.getFeed();
-        String display = getProviderID(feed);
-        provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", 
"_")));
-        provider.setDisplayName(display);
-        provider.setUrl(feed.getUrl());
-        return provider;
+    actor.setDisplayName(author.getName());
+    actor.setUrl(author.getHomeUrl());
+    actor.setSummary(author.getDescription());
+    actor.setAdditionalProperty("email", author.getEmail());
+    return actor;
+  }
+
+  private static void fixActivityId(Activity activity) {
+    if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) {
+      activity.setId(null);
     }
-
-    public static ActivityObject convert(Author author, String platformName) {
-        ActivityObject actor = new ActivityObject();
-        AuthorPublishingPlatform platform = author.getPublishingPlatform();
-        String userId = platform.getUserId();
-        if (userId != null) 
actor.setId(ActivityUtil.getPersonId(getProviderID(platformName), userId));
-        actor.setDisplayName(author.getName());
-        actor.setUrl(author.getHomeUrl());
-        actor.setSummary(author.getDescription());
-        actor.setAdditionalProperty("email", author.getEmail());
-        return actor;
+  }
+
+  private static List convertLinks(Article article) {
+    List<String> list = new LinkedList<>();
+    Article.OutboundUrls outboundUrls = article.getOutboundUrls();
+    if (outboundUrls != null) {
+      for (String url : outboundUrls.getOutboundUrl()) {
+        list.add(url);
+      }
     }
-
-    public static void addLocationExtension(Activity activity, Source value) {
-        Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
-        String country = value.getLocation().getCountryCode() == null ? 
value.getLocation().getCountry() : value.getLocation().getCountryCode();
-        if (country != null) {
-            Map<String, Object> location = new HashMap<>();
-            location.put(LOCATION_EXTENSION_COUNTRY, country);
-            extensions.put(LOCATION_EXTENSION, location);
-        }
+    return list;
+  }
+
+  /**
+   * convertTarget.
+   * @param source source
+   * @return ActivityObject $.target
+   */
+  public static ActivityObject convertTarget(Source source) {
+    ActivityObject object = new ActivityObject();
+    object.setUrl(source.getHomeUrl());
+    object.setDisplayName(source.getName());
+    return object;
+  }
+
+  /**
+   * convertObject.
+   * @param article article
+   * @return ActivityObject $.object
+   */
+  public static ActivityObject convertObject(Article article) {
+    ActivityObject object = new ActivityObject();
+    object.setContent(article.getContent());
+    object.setSummary(article.getTitle());
+    object.setUrl(article.getOriginalUrl());
+    object.setObjectType(article.getDataFormat());
+    String type = article.getDataFormat().equals("text") ? "article" : 
article.getDataFormat();
+    object.setId(getObjectId(getProviderId(article.getSource().getFeed()), 
type, article.getId()));
+    object.setPublished(DateTime.parse(article.getPublishedDate()));
+    return object;
+  }
+
+  /**
+   * addLocationExtension.
+   * @param activity Activity
+   * @param source Source
+   */
+  public static void addLocationExtension(Activity activity, Source source) {
+    Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
+    String country = source.getLocation().getCountryCode() == null
+        ? source.getLocation().getCountry()
+        : source.getLocation().getCountryCode();
+    if (country != null) {
+      Map<String, Object> location = new HashMap<>();
+      location.put(LOCATION_EXTENSION_COUNTRY, country);
+      extensions.put(LOCATION_EXTENSION, location);
     }
-
-    public static void addLanguageExtension(Activity activity, Article value) {
-        Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
-        String language = value.getLanguage();
-        if (language != null) {
-            extensions.put(LANGUAGE_EXTENSION, language);
-        }
+  }
+
+  /**
+   * addLanguageExtension.
+   * @param activity Activity
+   * @param article Article
+   */
+  public static void addLanguageExtension(Activity activity, Article article) {
+    Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
+    String language = article.getLanguage();
+    if (language != null) {
+      extensions.put(LANGUAGE_EXTENSION, language);
     }
+  }
 
-    public static Date parse(String str) {
-        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
-        try {
-            return fmt.parse(str);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid date format", e);
-        }
-    }
+  private static String getProviderId(Feed feed) {
+    return getProviderId(feed.getPublishingPlatform() == null ? 
feed.getMediaType() : feed.getPublishingPlatform());
+  }
 
-    private static String getProviderID(Feed feed) {
-        return getProviderID(feed.getPublishingPlatform() == null ? 
feed.getMediaType() : feed.getPublishingPlatform());
-    }
-
-    private static String getProviderID(String feed) {
-        return feed.toLowerCase().replace(" ", "_").trim();
-    }
+  private static String getProviderId(String feed) {
+    return feed.toLowerCase().replace(" ", "_").trim();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
index 4b7b3b0..fe4378c 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
@@ -18,89 +18,89 @@
 
 package org.apache.streams.moreover;
 
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+
 import com.moreover.api.Article;
 import com.moreover.api.ArticlesResponse;
 import com.moreover.api.ObjectFactory;
 import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.moreover.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
 
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
 
 /**
- * Deserializes the Moreover Article XML and converts it to an instance of 
{@link Activity}
+ * Deserializes the Moreover Article XML and converts it to an instance of 
{@link Activity}.
  */
 public class MoreoverXmlActivitySerializer implements 
ActivitySerializer<String> {
 
-    //JAXBContext is threadsafe (supposedly)
-    private final JAXBContext articleContext;
-    private final JAXBContext articlesContext;
+  //JAXBContext is threadsafe (supposedly)
+  private final JAXBContext articleContext;
+  private final JAXBContext articlesContext;
 
-    public MoreoverXmlActivitySerializer() {
-        articleContext = createContext(Article.class);
-        articlesContext = createContext(ArticlesResponse.class);
-    }
+  public MoreoverXmlActivitySerializer() {
+    articleContext = createContext(Article.class);
+    articlesContext = createContext(ArticlesResponse.class);
+  }
 
-    @Override
-    public String serializationFormat() {
-        return "application/xml+vnd.moreover.com.v1";
-    }
+  @Override
+  public String serializationFormat() {
+    return "application/xml+vnd.moreover.com.v1";
+  }
 
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover");
-    }
+  @Override
+  public String serialize(Activity deserialized) {
+    throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover");
+  }
 
-    @Override
-    public Activity deserialize(String serialized) {
-        Article article = deserializeMoreover(serialized);
-        return MoreoverUtils.convert(article);
-    }
+  @Override
+  public Activity deserialize(String serialized) {
+    Article article = deserializeMoreover(serialized);
+    return MoreoverUtils.convert(article);
+  }
 
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        List<Activity> activities = new LinkedList<Activity>();
-        for(String item : serializedList) {
-            ArticlesResponse response = deserializeMoreoverResponse(item);
-            for(Article article : response.getArticles().getArticle()) {
-                activities.add(MoreoverUtils.convert(article));
-            }
-        }
-        return activities;
+  @Override
+  public List<Activity> deserializeAll(List<String> serializedList) {
+    List<Activity> activities = new LinkedList<Activity>();
+    for (String item : serializedList) {
+      ArticlesResponse response = deserializeMoreoverResponse(item);
+      for (Article article : response.getArticles().getArticle()) {
+        activities.add(MoreoverUtils.convert(article));
+      }
     }
+    return activities;
+  }
 
-    private Article deserializeMoreover(String serialized){
-        try {
-            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
-            return (Article) unmarshaller.unmarshal(new 
StringReader(serialized));
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover 
data", e);
-        }
+  private Article deserializeMoreover(String serialized) {
+    try {
+      Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+      return (Article) unmarshaller.unmarshal(new StringReader(serialized));
+    } catch (JAXBException ex) {
+      throw new SerializationException("Unable to deserialize Moreover data", 
ex);
     }
+  }
 
-    private ArticlesResponse deserializeMoreoverResponse(String serialized){
-        try {
-            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
-            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new 
StringReader(serialized))).getValue();
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover 
data", e);
-        }
+  private ArticlesResponse deserializeMoreoverResponse(String serialized) {
+    try {
+      Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+      return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new 
StringReader(serialized))).getValue();
+    } catch (JAXBException ex) {
+      throw new SerializationException("Unable to deserialize Moreover data", 
ex);
     }
+  }
 
-    private JAXBContext createContext(Class articleClass) {
-        JAXBContext context;
-        try {
-            context = 
JAXBContext.newInstance(articleClass.getPackage().getName(), 
ObjectFactory.class.getClassLoader());
-        } catch (JAXBException e) {
-            throw new IllegalStateException("Unable to create JAXB Context for 
Moreover data", e);
-        }
-        return context;
+  private JAXBContext createContext(Class articleClass) {
+    JAXBContext context;
+    try {
+      context = JAXBContext.newInstance(articleClass.getPackage().getName(), 
ObjectFactory.class.getClassLoader());
+    } catch (JAXBException ex) {
+      throw new IllegalStateException("Unable to create JAXB Context for 
Moreover data", ex);
     }
+    return context;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
index cdd5822..c9bd823 100644
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
@@ -19,25 +19,35 @@
 package org.apache.streams.moreover;
 
 import org.apache.streams.pojo.json.Activity;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.regex.Pattern.matches;
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+/**
+ * MoreoverTestUtil.
+ */
 public class MoreoverTestUtil {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MoreoverTestUtil.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MoreoverTestUtil.class);
 
-    public static void test(Activity activity) {
-        assertThat(activity, is(not(nullValue())));
-        assertThat(activity.getActor(), is(not(nullValue())));
-        assertThat(activity.getObject(), is(not(nullValue())));
-        if(activity.getObject().getId() != null) {
-            assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", 
activity.getObject().getId()), is(true));
-        }
-        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-        LOGGER.debug(activity.getPublished().toString());
+  /**
+   *
+   * @param activity
+   */
+  public static void validate(Activity activity) {
+    assertThat(activity, is(not(nullValue())));
+    assertThat(activity.getActor(), is(not(nullValue())));
+    assertThat(activity.getObject(), is(not(nullValue())));
+    if(activity.getObject().getId() != null) {
+      assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", 
activity.getObject().getId()), is(true));
     }
+    assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+    LOGGER.debug(activity.getPublished().toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
index 94ef097..b7ae076 100644
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
@@ -18,13 +18,15 @@
 
 package org.apache.streams.moreover.test;
 
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverJsonActivitySerializer;
+import org.apache.streams.moreover.MoreoverTestUtil;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.moreover.MoreoverJsonActivitySerializer;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -32,45 +34,48 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.nio.charset.Charset;
 
-import static org.apache.streams.moreover.MoreoverTestUtil.test;
-
 /**
- * Tests ability to serialize moreover json Strings
+ * Tests ability to serialize moreover json Strings.
  */
 public class MoreoverJsonActivitySerializerIT {
-    JsonNode json;
-    ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
-    ObjectMapper mapper;
 
-    @Before
-    public void setup() throws Exception {
+  JsonNode json;
+  ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
+  ObjectMapper mapper;
 
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = 
this.getClass().getResourceAsStream("/moreover.json");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+  /**
+   * Before.
+   * @throws Exception Exception
+   */
+  @Before
+  public void setup() throws Exception {
 
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+    StringWriter writer = new StringWriter();
+    InputStream resourceAsStream = 
this.getClass().getResourceAsStream("/moreover.json");
+    IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
 
-        json = mapper.readValue(writer.toString(), JsonNode.class);
-    }
+    mapper = new ObjectMapper();
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
+    mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+    
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+
+    json = mapper.readValue(writer.toString(), JsonNode.class);
+  }
 
-    @Test
-    public void loadData() throws Exception {
-        for (JsonNode item : json) {
-            test(serializer.deserialize(getString(item)));
-        }
+  @Test
+  public void loadData() throws Exception {
+    for (JsonNode item : json) {
+      MoreoverTestUtil.validate(serializer.deserialize(getString(item)));
     }
+  }
 
 
-    private String getString(JsonNode jsonNode)  {
-        try {
-            return new ObjectMapper().writeValueAsString(jsonNode);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
+  private String getString(JsonNode jsonNode)  {
+    try {
+      return new ObjectMapper().writeValueAsString(jsonNode);
+    } catch (JsonProcessingException ex) {
+      throw new RuntimeException(ex);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
index ad0b384..2d93656 100644
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
@@ -18,10 +18,13 @@
 
 package org.apache.streams.moreover.test;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverTestUtil;
+import org.apache.streams.moreover.MoreoverXmlActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -31,36 +34,32 @@ import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import org.apache.streams.moreover.MoreoverXmlActivitySerializer;
-
-import static org.apache.streams.moreover.MoreoverTestUtil.test;
-
 /**
  * Tests ability to serialize moreover xml Strings
  */
 public class MoreoverXmlActivitySerializerIT {
-    ActivitySerializer serializer;
-    private String xml;
+  ActivitySerializer serializer;
+  private String xml;
 
-    @Before
-    public void setup() throws IOException {
-        serializer = new MoreoverXmlActivitySerializer();
-        xml = loadXml();
-    }
+  @Before
+  public void setup() throws IOException {
+    serializer = new MoreoverXmlActivitySerializer();
+    xml = loadXml();
+  }
 
-    @Test
-    public void loadData() throws Exception {
-        List<Activity> activities = 
serializer.deserializeAll(Lists.newArrayList(xml));
-        for (Activity activity : activities) {
-            test(activity);
-        }
+  @Test
+  public void loadData() throws Exception {
+    List<Activity> activities = 
serializer.deserializeAll(Lists.newArrayList(xml));
+    for (Activity activity : activities) {
+      MoreoverTestUtil.validate(activity);
     }
+  }
 
-    private String loadXml() throws IOException {
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = 
this.getClass().getResourceAsStream("/moreover.xml");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-        return writer.toString();
-    }
+  private String loadXml() throws IOException {
+    StringWriter writer = new StringWriter();
+    InputStream resourceAsStream = 
this.getClass().getResourceAsStream("/moreover.xml");
+    IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+    return writer.toString();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
index 2bc672d..f5b61bf 100644
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
@@ -18,10 +18,11 @@
 
 package org.apache.streams.moreover.test.provider;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.moreover.MoreoverProvider;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import org.apache.streams.moreover.MoreoverProvider;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -32,36 +33,34 @@ import java.io.FileReader;
 import java.io.LineNumberReader;
 
 /**
- * Integration test for MoreoverProviderIT
- *
- * Created by sblackmon on 10/21/16.
+ * Integration test for MoreoverProviderIT.
  */
 @Ignore("this is ignored because the project doesn't have credentials to test 
it with during CI")
 public class MoreoverProviderIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MoreoverProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MoreoverProviderIT.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Test
-    public void testRssStreamProvider() throws Exception {
+  @Test
+  public void testRssStreamProvider() throws Exception {
 
-        String configfile = "./target/test-classes/RssStreamProviderIT.conf";
-        String outfile = 
"./target/test-classes/RssStreamProviderIT.stdout.txt";
+    String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+    String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
 
-        MoreoverProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+    MoreoverProvider.main(Lists.newArrayList(configfile, outfile).toArray(new 
String[2]));
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 1);
+    assert (outCounter.getLineNumber() >= 1);
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
index 1df1ff9..d3c763a 100644
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -18,13 +18,14 @@
 
 package org.apache.streams.rss.processor;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -35,45 +36,45 @@ import java.util.List;
 /**
  * Converts ObjectNode representations of Rome SyndEntries to activities.
  */
-public class RssTypeConverter implements StreamsProcessor{
-
-    public final static String STREAMS_ID = "RssTypeConverter";
+public class RssTypeConverter implements StreamsProcessor {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(RssTypeConverter.class);
+  public static final String STREAMS_ID = "RssTypeConverter";
 
-    private SyndEntryActivitySerializer serializer;
-    private int successCount = 0;
-    private int failCount = 0;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RssTypeConverter.class);
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  private SyndEntryActivitySerializer serializer;
+  private int successCount = 0;
+  private int failCount = 0;
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum datum) {
-        List<StreamsDatum> datums = Lists.newLinkedList();
-        if(datum.getDocument() instanceof ObjectNode) {
-            Activity activity = this.serializer.deserialize((ObjectNode) 
datum.getDocument());
-            datums.add(new StreamsDatum(activity, activity.getId(), 
DateTime.now().withZone(DateTimeZone.UTC)));
-            successCount ++;
-        } else {
-            failCount ++;
-            throw new NotImplementedException("Not implemented for class type 
: "+ datum.getDocument().getClass().toString());
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        }
-        LOGGER.debug("Processor current success count: {} and current fail: 
{}", successCount, failCount);
+  @Override
+  public List<StreamsDatum> process(StreamsDatum datum) {
+    List<StreamsDatum> datums = Lists.newLinkedList();
+    if (datum.getDocument() instanceof ObjectNode) {
+      Activity activity = this.serializer.deserialize((ObjectNode) 
datum.getDocument());
+      datums.add(new StreamsDatum(activity, activity.getId(), 
DateTime.now().withZone(DateTimeZone.UTC)));
+      successCount ++;
+    } else {
+      failCount ++;
+      throw new NotImplementedException("Not implemented for class type : " + 
datum.getDocument().getClass().toString());
 
-        return datums;
     }
+    LOGGER.debug("Processor current success count: {} and current fail: {}", 
successCount, failCount);
 
-    @Override
-    public void prepare(Object o) {
-        this.serializer = new SyndEntryActivitySerializer();
-    }
+    return datums;
+  }
 
-    @Override
-    public void cleanUp() {
+  @Override
+  public void prepare(Object configurationObject) {
+    this.serializer = new SyndEntryActivitySerializer();
+  }
 
-    }
+  @Override
+  public void cleanUp() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
deleted file mode 100644
index 4e6efee..0000000
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.provider;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.sun.syndication.feed.synd.SyndEntry;
-
-/**
- * Created by sblackmon on 12/13/13.
- */
-public class RssEventClassifier {
-
-    public static Class detectClass( ObjectNode bean ) {
-        return SyndEntry.class;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
index 75d275d..078356c 100644
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
@@ -18,86 +18,100 @@
 
 package org.apache.streams.rss.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.sun.syndication.feed.synd.SyndEntry;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
 import org.apache.streams.rss.serializer.SyndEntrySerializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.sun.syndication.feed.synd.SyndEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
 import java.util.Random;
 
+/**
+ * RssEventProcessor processes Rss Events.
+ */
 public class RssEventProcessor implements Runnable {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(RssEventProcessor.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private Queue<SyndEntry> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private SyndEntryActivitySerializer syndEntryActivitySerializer = new 
SyndEntryActivitySerializer();
-    private SyndEntrySerializer syndEntrySerializer = new 
SyndEntrySerializer();
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class inClass, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.outClass = outClass;
-    }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RssEventProcessor.class);
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  private Queue<SyndEntry> inQueue;
+  private Queue<StreamsDatum> outQueue;
+
+  private Class inClass;
+  private Class outClass;
+
+  private SyndEntryActivitySerializer syndEntryActivitySerializer = new 
SyndEntryActivitySerializer();
+  private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer();
+
+  public static final String TERMINATE = new String("TERMINATE");
+
+  /**
+   * RssEventProcessor constructor.
+   * @param inQueue inQueue
+   * @param outQueue outQueue
+   * @param inClass inClass
+   * @param outClass outClass
+   */
+  public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class inClass, Class outClass) {
+    this.inQueue = inQueue;
+    this.outQueue = outQueue;
+    this.inClass = inClass;
+    this.outClass = outClass;
+  }
+
+  /**
+   * RssEventProcessor constructor.
+   * @param inQueue inQueue
+   * @param outQueue outQueue
+   * @param outClass outClass
+   */
+  public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class outClass) {
+    this.inQueue = inQueue;
+    this.outQueue = outQueue;
+    this.outClass = outClass;
+  }
+
+  @Override
+  public void run() {
+
+    while (true) {
+      Object item;
+      try {
+        item = inQueue.poll();
+        if (item instanceof String && item.equals(TERMINATE)) {
+          LOGGER.info("Terminating!");
+          break;
+        }
 
-    @Override
-    public void run() {
-
-        while(true) {
-            Object item;
-            try {
-                item = inQueue.poll();
-                if(item instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                Thread.sleep(new Random().nextInt(100));
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass))
-                    outQueue.offer(new StreamsDatum(item.toString()));
-                else if( SyndEntry.class.equals(outClass))
-                {
-                    outQueue.offer(new StreamsDatum(item));
-                }
-                else if( Activity.class.equals(outClass))
-                {
-                    // convert to desired format
-                    SyndEntry entry = (SyndEntry)item;
-                    if( entry != null ) {
-                        Activity out = 
syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item));
-
-                        if( out != null )
-                            outQueue.offer(new StreamsDatum(out));
-                    }
-                }
-
-            } catch (Exception e) {
-                e.printStackTrace();
+        Thread.sleep(new Random().nextInt(100));
+
+        // if the target is string, just pass-through
+        if ( String.class.equals(outClass)) {
+          outQueue.offer(new StreamsDatum(item.toString()));
+        } else if ( SyndEntry.class.equals(outClass)) {
+          outQueue.offer(new StreamsDatum(item));
+        } else if ( Activity.class.equals(outClass)) {
+          // convert to desired format
+          SyndEntry entry = (SyndEntry)item;
+          if ( entry != null ) {
+            Activity out = 
syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item));
+
+            if ( out != null ) {
+              outQueue.offer(new StreamsDatum(out));
             }
+          }
         }
+
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
     }
+  }
 
-};
+}

Reply via email to