http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java index 17fde37..91d487e 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java @@ -18,6 +18,9 @@ package org.apache.streams.moreover; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.Activity; + import com.fasterxml.jackson.databind.AnnotationIntrospector; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -25,9 +28,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import com.moreover.api.Article; import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.moreover.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,64 +35,64 @@ import java.io.IOException; import java.util.List; /** - * Deserializes Moreover JSON format into Activities + * Deserializes Moreover JSON format into Activities. */ public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> { - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class); - public MoreoverJsonActivitySerializer() { - } + public MoreoverJsonActivitySerializer() { + } - @Override - public String serializationFormat() { - return "application/json+vnd.moreover.com.v1"; - } + @Override + public String serializationFormat() { + return "application/json+vnd.moreover.com.v1"; + } - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); - } + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); + } - @Override - public Activity deserialize(String serialized) { - serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); - - LOGGER.debug(serialized); - - ObjectMapper mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); - mapper.setAnnotationIntrospector(introspector); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); - - Article article; - try { - ObjectNode node = (ObjectNode)mapper.readTree(serialized); - node.remove("tags"); - node.remove("locations"); - node.remove("companies"); - node.remove("topics"); - node.remove("media"); - node.remove("outboundUrls"); - ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); - jsonNodes.remove("editorialTopics"); - jsonNodes.remove("tags"); - jsonNodes.remove("autoTopics"); - article = mapper.convertValue(node, Article.class); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to deserialize", e); - } - return MoreoverUtils.convert(article); - } + @Override + public Activity deserialize(String serialized) { + serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - throw new NotImplementedException("Not currently implemented"); + LOGGER.debug(serialized); + + ObjectMapper mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); + mapper.setAnnotationIntrospector(introspector); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); + + Article article; + try { + ObjectNode node = (ObjectNode)mapper.readTree(serialized); + node.remove("tags"); + node.remove("locations"); + node.remove("companies"); + node.remove("topics"); + node.remove("media"); + node.remove("outboundUrls"); + ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); + jsonNodes.remove("editorialTopics"); + jsonNodes.remove("tags"); + jsonNodes.remove("autoTopics"); + article = mapper.convertValue(node, Article.class); + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to deserialize", ex); } + return MoreoverUtils.convert(article); + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java index 78d8e9d..2ab6ee4 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java @@ -18,14 +18,6 @@ package org.apache.streams.moreover; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.*; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -33,6 +25,17 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,136 +45,155 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** - * Streams Provider for the Moreover Metabase API - * - * To use from command line: - * - * Supply configuration similar to src/test/resources/rss.conf - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider -Dexec.args="rss.conf articles.json" + * Streams Provider for the Moreover Metabase API. */ public class MoreoverProvider implements StreamsProvider { - public final static String STREAMS_ID = "MoreoverProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class); - - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + public static final String STREAMS_ID = "MoreoverProvider"; - private List<MoreoverKeyData> keys; + private static final Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class); - private MoreoverConfiguration config; + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - private ExecutorService executor; + private List<MoreoverKeyData> keys; - public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) { - this.config = moreoverConfiguration; - this.keys = Lists.newArrayList(); - for( MoreoverKeyData apiKey : config.getApiKeys()) { - this.keys.add(apiKey); - } - } - - @Override - public String getId() { - return STREAMS_ID; - } - - public void startStream() { + private MoreoverConfiguration config; - for(MoreoverKeyData key : keys) { - MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence()); - executor.submit(new Thread(task)); - LOGGER.info("Started producer for {}", key.getKey()); - } + private ExecutorService executor; + /** + * MoreoverProvider constructor. + * @param moreoverConfiguration MoreoverConfiguration + */ + public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) { + this.config = moreoverConfiguration; + this.keys = Lists.newArrayList(); + for ( MoreoverKeyData apiKey : config.getApiKeys()) { + this.keys.add(apiKey); } + } - @Override - public synchronized StreamsResultSet readCurrent() { + @Override + public String getId() { + return STREAMS_ID; + } - LOGGER.debug("readCurrent: {}", providerQueue.size()); - - Collection<StreamsDatum> currentIterator = Lists.newArrayList(); - Iterators.addAll(currentIterator, providerQueue.iterator()); - - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); - - providerQueue.clear(); - - return current; - } + @Override + public void startStream() { - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; + for (MoreoverKeyData key : keys) { + MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence()); + executor.submit(new Thread(task)); + LOGGER.info("Started producer for {}", key.getKey()); } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return !executor.isShutdown() && !executor.isTerminated(); - } - - @Override - public void prepare(Object configurationObject) { - LOGGER.debug("Prepare"); - executor = Executors.newSingleThreadExecutor(); - } - - @Override - public void cleanUp() { - - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - MoreoverConfiguration config = new ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe, "rss"); - MoreoverProvider provider = new MoreoverProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + } + + @Override + public synchronized StreamsResultSet readCurrent() { + + LOGGER.debug("readCurrent: {}", providerQueue.size()); + + Collection<StreamsDatum> currentIterator = Lists.newArrayList(); + Iterators.addAll(currentIterator, providerQueue.iterator()); + + StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); + + providerQueue.clear(); + + return current; + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isShutdown() && !executor.isTerminated(); + } + + @Override + public void prepare(Object configurationObject) { + LOGGER.debug("Prepare"); + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void cleanUp() { + + } + + /** + * To use from command line: + * + * <p/> + * Supply configuration similar to src/test/resources/rss.conf + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider -Dexec.args="rss.conf articles.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + MoreoverConfiguration config = new ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe, "rss"); + MoreoverProvider provider = new MoreoverProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } } + while ( provider.isRunning() ); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java index ad92d73..88aec81 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java @@ -18,74 +18,84 @@ package org.apache.streams.moreover; -import com.google.common.collect.ImmutableSet; import org.apache.streams.core.StreamsDatum; + +import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Queue; /** - * Task to pull from the Morever API + * Task that pulls from the Morever API on behalf of MoreoverProvider. */ public class MoreoverProviderTask implements Runnable { - public static final int LATENCY = 10; - public static final int REQUIRED_LATENCY = LATENCY * 1000; - private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class); - - private String lastSequence; - private final String apiKey; - private final String apiId; - private final Queue<StreamsDatum> results; - private final MoreoverClient moClient; - private boolean started = false; + public static final int LATENCY = 10; + public static final int REQUIRED_LATENCY = LATENCY * 1000; + private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class); - public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) { - //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence); - this.apiId = apiId; - this.apiKey = apiKey; - this.results = results; - this.lastSequence = lastSequence; - this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence); - initializeClient(moClient); - } + private String lastSequence; + private final String apiKey; + private final String apiId; + private final Queue<StreamsDatum> results; + private final MoreoverClient moClient; + private boolean started = false; - @Override - public void run() { - while(true) { - try { - ensureTime(moClient); - MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500); - started = true; - lastSequence = result.process().toString(); - for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator())) - results.offer(entry); - logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence); + /** + * MoreoverProviderTask constructor. + * @param apiId apiId + * @param apiKey apiKey + * @param results results + * @param lastSequence lastSequence + */ + public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) { + //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence); + this.apiId = apiId; + this.apiKey = apiKey; + this.results = results; + this.lastSequence = lastSequence; + this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence); + initializeClient(moClient); + } - } catch (Exception e) { - logger.error("Exception while polling moreover", e); - } + @Override + public void run() { + while (true) { + try { + ensureTime(moClient); + MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500); + started = true; + lastSequence = result.process().toString(); + for (StreamsDatum entry : ImmutableSet.copyOf(result.iterator())) { + results.offer(entry); } + logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence); + + } catch (Exception ex) { + logger.error("Exception while polling moreover", ex); + } } + } - private void ensureTime(MoreoverClient moClient) { - try { - long gap = System.currentTimeMillis() - moClient.pullTime; - if (gap < REQUIRED_LATENCY) - Thread.sleep(REQUIRED_LATENCY - gap); - } catch (Exception e) { - logger.warn("Error sleeping for latency"); - } + private void ensureTime(MoreoverClient moClient) { + try { + long gap = System.currentTimeMillis() - moClient.pullTime; + if (gap < REQUIRED_LATENCY) { + Thread.sleep(REQUIRED_LATENCY - gap); + } + } catch (Exception ex) { + logger.warn("Error sleeping for latency"); } + } - private void initializeClient(MoreoverClient moClient) { - try { - moClient.getArticlesAfter(this.lastSequence, 2); - } catch (Exception e) { - logger.error("Failed to start stream, {}", this.apiKey); - logger.error("Exception : ", e); - throw new IllegalStateException("Unable to initialize stream", e); - } + private void initializeClient(MoreoverClient moClient) { + try { + moClient.getArticlesAfter(this.lastSequence, 2); + } catch (Exception ex) { + logger.error("Failed to start stream, {}", this.apiKey); + logger.error("Exception : ", ex); + throw new IllegalStateException("Unable to initialize stream", ex); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java index e07084f..589e647 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java @@ -18,6 +18,8 @@ package org.apache.streams.moreover; +import org.apache.streams.core.StreamsDatum; + import com.fasterxml.aalto.stax.InputFactoryImpl; import com.fasterxml.aalto.stax.OutputFactoryImpl; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -29,7 +31,6 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.google.common.collect.Lists; import com.moreover.api.Article; import com.moreover.api.ArticlesResponse; -import org.apache.streams.core.StreamsDatum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,163 +39,163 @@ import java.math.BigInteger; import java.util.Iterator; import java.util.List; - public class MoreoverResult implements Iterable<StreamsDatum> { - private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class); - - private ObjectMapper mapper; - private XmlMapper xmlMapper; - - private String xmlString; - private String jsonString; - private ArticlesResponse resultObject; - private ArticlesResponse.Articles articles; - private List<Article> articleArray; - private long start; - private long end; - private String clientId; - private BigInteger maxSequencedId = BigInteger.ZERO; - - protected ArticlesResponse response; - protected List<StreamsDatum> list = Lists.newArrayList(); - - protected MoreoverResult(String clientId, String xmlString, long start, long end) { - this.xmlString = xmlString; - this.clientId = clientId; - this.start = start; - this.end = end; - XmlFactory f = new XmlFactory(new InputFactoryImpl(), - new OutputFactoryImpl()); - - JacksonXmlModule module = new JacksonXmlModule(); - - module.setDefaultUseWrapper(false); - - xmlMapper = new XmlMapper(f, module); - - xmlMapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, - Boolean.FALSE); - - mapper = new ObjectMapper(); - - mapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - mapper.configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - mapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - mapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); - + private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class); + + private ObjectMapper mapper; + private XmlMapper xmlMapper; + + private String xmlString; + private String jsonString; + private ArticlesResponse resultObject; + private ArticlesResponse.Articles articles; + private List<Article> articleArray; + private long start; + private long end; + private String clientId; + private BigInteger maxSequencedId = BigInteger.ZERO; + + protected ArticlesResponse response; + protected List<StreamsDatum> list = Lists.newArrayList(); + + protected MoreoverResult(String clientId, String xmlString, long start, long end) { + this.xmlString = xmlString; + this.clientId = clientId; + this.start = start; + this.end = end; + XmlFactory xmlFactory = new XmlFactory(new InputFactoryImpl(), + new OutputFactoryImpl()); + + JacksonXmlModule module = new JacksonXmlModule(); + + module.setDefaultUseWrapper(false); + + xmlMapper = new XmlMapper(xmlFactory, module); + + xmlMapper + .configure( + DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, + Boolean.TRUE); + xmlMapper.configure( + DeserializationFeature.READ_ENUMS_USING_TO_STRING, + Boolean.TRUE); + xmlMapper.configure( + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + Boolean.FALSE); + + mapper = new ObjectMapper(); + + mapper + .configure( + DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + Boolean.TRUE); + mapper.configure( + DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, + Boolean.TRUE); + mapper + .configure( + DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, + Boolean.TRUE); + mapper.configure( + DeserializationFeature.READ_ENUMS_USING_TO_STRING, + Boolean.TRUE); + + } + + public String getClientId() { + return clientId; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + /** + * Process batch and + * @return max sequenceId. + */ + public BigInteger process() { + + try { + this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class); + } catch (JsonMappingException ex) { + // theory is this may not be fatal + this.resultObject = (ArticlesResponse) ex.getPath().get(0).getFrom(); + } catch (Exception ex) { + ex.printStackTrace(); + logger.warn("Unable to process document:"); + logger.warn(xmlString); } - public String getClientId() { - return clientId; + if ( this.resultObject.getStatus().equals("FAILURE")) { + logger.warn(this.resultObject.getStatus()); + logger.warn(this.resultObject.getMessageCode()); + logger.warn(this.resultObject.getUserMessage()); + logger.warn(this.resultObject.getDeveloperMessage()); + } else { + this.articles = resultObject.getArticles(); + this.articleArray = articles.getArticle(); + + for (Article article : articleArray) { + BigInteger sequenceid = new BigInteger(article.getSequenceId()); + list.add(new StreamsDatum(article, sequenceid)); + logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); + if (sequenceid.compareTo(this.maxSequencedId) > 0) { + this.maxSequencedId = sequenceid; + } + } } - public long getStart() { - return start; - } + return this.maxSequencedId; + } - public long getEnd() { - return end; - } + public String getXmlString() { + return this.xmlString; + } - public BigInteger process() { - - try { - this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class); - } catch (JsonMappingException e) { - // theory is this may not be fatal - this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom(); - } catch (Exception e) { - e.printStackTrace(); - logger.warn("Unable to process document:"); - logger.warn(xmlString); - } + public BigInteger getMaxSequencedId() { + return this.maxSequencedId; + } - if( this.resultObject.getStatus().equals("FAILURE")) - { - logger.warn(this.resultObject.getStatus()); - logger.warn(this.resultObject.getMessageCode()); - logger.warn(this.resultObject.getUserMessage()); - logger.warn(this.resultObject.getDeveloperMessage()); - } - else - { - this.articles = resultObject.getArticles(); - this.articleArray = articles.getArticle(); - - for (Article article : articleArray) { - BigInteger sequenceid = new BigInteger(article.getSequenceId()); - list.add(new StreamsDatum(article, sequenceid)); - logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); - if (sequenceid.compareTo(this.maxSequencedId) > 0) { - this.maxSequencedId = sequenceid; - } - } - } + @Override + public Iterator<StreamsDatum> iterator() { + return list.iterator(); + } - return this.maxSequencedId; - } + protected static class JsonStringIterator implements Iterator<Serializable> { - public String getXmlString() { - return this.xmlString; - } + private Iterator<Serializable> underlying; - public BigInteger getMaxSequencedId() { - return this.maxSequencedId; + protected JsonStringIterator(Iterator<Serializable> underlying) { + this.underlying = underlying; } @Override - public Iterator<StreamsDatum> iterator() { - return list.iterator(); + public boolean hasNext() { + return underlying.hasNext(); } - protected static class JsonStringIterator implements Iterator<Serializable> { - - private Iterator<Serializable> underlying; - - protected JsonStringIterator(Iterator<Serializable> underlying) { - this.underlying = underlying; - } - - @Override - public boolean hasNext() { - return underlying.hasNext(); - } - - @Override - public String next() { - return underlying.next().toString(); - } + @Override + public String next() { + return underlying.next().toString(); + } - @Override - public void remove() { - underlying.remove(); - } + @Override + public void remove() { + underlying.remove(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java deleted file mode 100644 index 0a47bd1..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.moreover; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; - -import java.util.Queue; - -public class MoreoverResultSetWrapper extends StreamsResultSet { - - public MoreoverResultSetWrapper(MoreoverResult underlying) { - super((Queue<StreamsDatum>)underlying); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java index 8a91281..f9a3595 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java @@ -18,22 +18,19 @@ package org.apache.streams.moreover; -import com.moreover.api.Article; -import com.moreover.api.Author; -import com.moreover.api.AuthorPublishingPlatform; -import com.moreover.api.Feed; -import com.moreover.api.Source; import org.apache.streams.data.util.ActivityUtil; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Provider; + +import com.moreover.api.Article; +import com.moreover.api.Author; +import com.moreover.api.AuthorPublishingPlatform; +import com.moreover.api.Feed; +import com.moreover.api.Source; import org.joda.time.DateTime; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -43,125 +40,156 @@ import static org.apache.streams.data.util.ActivityUtil.LANGUAGE_EXTENSION; import static org.apache.streams.data.util.ActivityUtil.LOCATION_EXTENSION; import static org.apache.streams.data.util.ActivityUtil.LOCATION_EXTENSION_COUNTRY; import static org.apache.streams.data.util.ActivityUtil.getObjectId; -import static org.apache.streams.data.util.ActivityUtil.getProviderId; /** - * Provides utilities for Moroever data + * Provides utilities for Moreover data. */ public class MoreoverUtils { - private MoreoverUtils() { - } - - public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; - - public static Activity convert(Article article) { - Activity activity = new Activity(); - Source source = article.getSource(); - activity.setActor(convert(article.getAuthor(), source.getName())); - activity.setProvider(convert(source)); - activity.setTarget(convertTarget(source)); - activity.setObject(convertObject(article)); - activity.setPublished(DateTime.parse(article.getPublishedDate())); - activity.setContent(article.getContent()); - activity.setTitle(article.getTitle()); - activity.setVerb("posted"); - fixActivityId(activity); - addLocationExtension(activity, source); - addLanguageExtension(activity, article); - activity.setLinks(convertLinks(article)); - return activity; - } - private static void fixActivityId(Activity activity) { - if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) { - activity.setId(null); - } + private MoreoverUtils() { + } + + public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + /** + * convert article into Activity. + * @param article article + * @return Activity + */ + public static Activity convert(Article article) { + Activity activity = new Activity(); + Source source = article.getSource(); + activity.setActor(convert(article.getAuthor(), source.getName())); + activity.setProvider(convert(source)); + activity.setTarget(convertTarget(source)); + activity.setObject(convertObject(article)); + activity.setPublished(DateTime.parse(article.getPublishedDate())); + activity.setContent(article.getContent()); + activity.setTitle(article.getTitle()); + activity.setVerb("posted"); + fixActivityId(activity); + addLocationExtension(activity, source); + addLanguageExtension(activity, article); + activity.setLinks(convertLinks(article)); + return activity; + } + + /** + * convert Source to Provider. + * @param source Source + * @return Provider + */ + public static Provider convert(Source source) { + Provider provider = new Provider(); + Feed feed = source.getFeed(); + String display = getProviderId(feed); + provider.setId(ActivityUtil.getProviderId(display.trim().toLowerCase().replace(" ", "_"))); + provider.setDisplayName(display); + provider.setUrl(feed.getUrl()); + return provider; + } + + /** + * convert Author and platformName to Actor. + * @param author Author + * @param platformName platformName + * @return $.actor + */ + public static ActivityObject convert(Author author, String platformName) { + ActivityObject actor = new ActivityObject(); + AuthorPublishingPlatform platform = author.getPublishingPlatform(); + String userId = platform.getUserId(); + if (userId != null) { + actor.setId(ActivityUtil.getPersonId(getProviderId(platformName), userId)); } - - private static List convertLinks(Article article) { - List<String> list = new LinkedList<>(); - Article.OutboundUrls outboundUrls = article.getOutboundUrls(); - if (outboundUrls != null) { - for (String url : outboundUrls.getOutboundUrl()) { - list.add(url); - } - } - return list; - } - - public static ActivityObject convertTarget(Source source) { - ActivityObject object = new ActivityObject(); - object.setUrl(source.getHomeUrl()); - object.setDisplayName(source.getName()); - return object; - } - - public static ActivityObject convertObject(Article article) { - ActivityObject object = new ActivityObject(); - object.setContent(article.getContent()); - object.setSummary(article.getTitle()); - object.setUrl(article.getOriginalUrl()); - object.setObjectType(article.getDataFormat()); - String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat(); - object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId())); - object.setPublished(DateTime.parse(article.getPublishedDate())); - return object; - } - - public static Provider convert(Source source) { - Provider provider = new Provider(); - Feed feed = source.getFeed(); - String display = getProviderID(feed); - provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_"))); - provider.setDisplayName(display); - provider.setUrl(feed.getUrl()); - return provider; + actor.setDisplayName(author.getName()); + actor.setUrl(author.getHomeUrl()); + actor.setSummary(author.getDescription()); + actor.setAdditionalProperty("email", author.getEmail()); + return actor; + } + + private static void fixActivityId(Activity activity) { + if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) { + activity.setId(null); } - - public static ActivityObject convert(Author author, String platformName) { - ActivityObject actor = new ActivityObject(); - AuthorPublishingPlatform platform = author.getPublishingPlatform(); - String userId = platform.getUserId(); - if (userId != null) actor.setId(ActivityUtil.getPersonId(getProviderID(platformName), userId)); - actor.setDisplayName(author.getName()); - actor.setUrl(author.getHomeUrl()); - actor.setSummary(author.getDescription()); - actor.setAdditionalProperty("email", author.getEmail()); - return actor; + } + + private static List convertLinks(Article article) { + List<String> list = new LinkedList<>(); + Article.OutboundUrls outboundUrls = article.getOutboundUrls(); + if (outboundUrls != null) { + for (String url : outboundUrls.getOutboundUrl()) { + list.add(url); + } } - - public static void addLocationExtension(Activity activity, Source value) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode(); - if (country != null) { - Map<String, Object> location = new HashMap<>(); - location.put(LOCATION_EXTENSION_COUNTRY, country); - extensions.put(LOCATION_EXTENSION, location); - } + return list; + } + + /** + * convertTarget. + * @param source source + * @return ActivityObject $.target + */ + public static ActivityObject convertTarget(Source source) { + ActivityObject object = new ActivityObject(); + object.setUrl(source.getHomeUrl()); + object.setDisplayName(source.getName()); + return object; + } + + /** + * convertObject. + * @param article article + * @return ActivityObject $.object + */ + public static ActivityObject convertObject(Article article) { + ActivityObject object = new ActivityObject(); + object.setContent(article.getContent()); + object.setSummary(article.getTitle()); + object.setUrl(article.getOriginalUrl()); + object.setObjectType(article.getDataFormat()); + String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat(); + object.setId(getObjectId(getProviderId(article.getSource().getFeed()), type, article.getId())); + object.setPublished(DateTime.parse(article.getPublishedDate())); + return object; + } + + /** + * addLocationExtension. + * @param activity Activity + * @param source Source + */ + public static void addLocationExtension(Activity activity, Source source) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + String country = source.getLocation().getCountryCode() == null + ? source.getLocation().getCountry() + : source.getLocation().getCountryCode(); + if (country != null) { + Map<String, Object> location = new HashMap<>(); + location.put(LOCATION_EXTENSION_COUNTRY, country); + extensions.put(LOCATION_EXTENSION, location); } - - public static void addLanguageExtension(Activity activity, Article value) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - String language = value.getLanguage(); - if (language != null) { - extensions.put(LANGUAGE_EXTENSION, language); - } + } + + /** + * addLanguageExtension. + * @param activity Activity + * @param article Article + */ + public static void addLanguageExtension(Activity activity, Article article) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + String language = article.getLanguage(); + if (language != null) { + extensions.put(LANGUAGE_EXTENSION, language); } + } - public static Date parse(String str) { - DateFormat fmt = new SimpleDateFormat(DATE_FORMAT); - try { - return fmt.parse(str); - } catch (ParseException e) { - throw new IllegalArgumentException("Invalid date format", e); - } - } + private static String getProviderId(Feed feed) { + return getProviderId(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform()); + } - private static String getProviderID(Feed feed) { - return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform()); - } - - private static String getProviderID(String feed) { - return feed.toLowerCase().replace(" ", "_").trim(); - } + private static String getProviderId(String feed) { + return feed.toLowerCase().replace(" ", "_").trim(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java index 4b7b3b0..fe4378c 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java @@ -18,89 +18,89 @@ package org.apache.streams.moreover; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.Activity; + import com.moreover.api.Article; import com.moreover.api.ArticlesResponse; import com.moreover.api.ObjectFactory; import org.apache.commons.lang.SerializationException; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.moreover.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; +import java.io.StringReader; +import java.util.LinkedList; +import java.util.List; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; -import java.io.StringReader; -import java.util.LinkedList; -import java.util.List; /** - * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity} + * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}. */ public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> { - //JAXBContext is threadsafe (supposedly) - private final JAXBContext articleContext; - private final JAXBContext articlesContext; + //JAXBContext is threadsafe (supposedly) + private final JAXBContext articleContext; + private final JAXBContext articlesContext; - public MoreoverXmlActivitySerializer() { - articleContext = createContext(Article.class); - articlesContext = createContext(ArticlesResponse.class); - } + public MoreoverXmlActivitySerializer() { + articleContext = createContext(Article.class); + articlesContext = createContext(ArticlesResponse.class); + } - @Override - public String serializationFormat() { - return "application/xml+vnd.moreover.com.v1"; - } + @Override + public String serializationFormat() { + return "application/xml+vnd.moreover.com.v1"; + } - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); - } + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); + } - @Override - public Activity deserialize(String serialized) { - Article article = deserializeMoreover(serialized); - return MoreoverUtils.convert(article); - } + @Override + public Activity deserialize(String serialized) { + Article article = deserializeMoreover(serialized); + return MoreoverUtils.convert(article); + } - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - List<Activity> activities = new LinkedList<Activity>(); - for(String item : serializedList) { - ArticlesResponse response = deserializeMoreoverResponse(item); - for(Article article : response.getArticles().getArticle()) { - activities.add(MoreoverUtils.convert(article)); - } - } - return activities; + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + List<Activity> activities = new LinkedList<Activity>(); + for (String item : serializedList) { + ArticlesResponse response = deserializeMoreoverResponse(item); + for (Article article : response.getArticles().getArticle()) { + activities.add(MoreoverUtils.convert(article)); + } } + return activities; + } - private Article deserializeMoreover(String serialized){ - try { - Unmarshaller unmarshaller = articleContext.createUnmarshaller(); - return (Article) unmarshaller.unmarshal(new StringReader(serialized)); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } + private Article deserializeMoreover(String serialized) { + try { + Unmarshaller unmarshaller = articleContext.createUnmarshaller(); + return (Article) unmarshaller.unmarshal(new StringReader(serialized)); + } catch (JAXBException ex) { + throw new SerializationException("Unable to deserialize Moreover data", ex); } + } - private ArticlesResponse deserializeMoreoverResponse(String serialized){ - try { - Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); - return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } + private ArticlesResponse deserializeMoreoverResponse(String serialized) { + try { + Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); + return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); + } catch (JAXBException ex) { + throw new SerializationException("Unable to deserialize Moreover data", ex); } + } - private JAXBContext createContext(Class articleClass) { - JAXBContext context; - try { - context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); - } catch (JAXBException e) { - throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e); - } - return context; + private JAXBContext createContext(Class articleClass) { + JAXBContext context; + try { + context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); + } catch (JAXBException ex) { + throw new IllegalStateException("Unable to create JAXB Context for Moreover data", ex); } + return context; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java index cdd5822..c9bd823 100644 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java @@ -19,25 +19,35 @@ package org.apache.streams.moreover; import org.apache.streams.pojo.json.Activity; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.regex.Pattern.matches; -import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +/** + * MoreoverTestUtil. + */ public class MoreoverTestUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class); - public static void test(Activity activity) { - assertThat(activity, is(not(nullValue()))); - assertThat(activity.getActor(), is(not(nullValue()))); - assertThat(activity.getObject(), is(not(nullValue()))); - if(activity.getObject().getId() != null) { - assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true)); - } - assertThat(activity.getObject().getObjectType(), is(not(nullValue()))); - LOGGER.debug(activity.getPublished().toString()); + /** + * + * @param activity + */ + public static void validate(Activity activity) { + assertThat(activity, is(not(nullValue()))); + assertThat(activity.getActor(), is(not(nullValue()))); + assertThat(activity.getObject(), is(not(nullValue()))); + if(activity.getObject().getId() != null) { + assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true)); } + assertThat(activity.getObject().getObjectType(), is(not(nullValue()))); + LOGGER.debug(activity.getPublished().toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java index 94ef097..b7ae076 100644 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java @@ -18,13 +18,15 @@ package org.apache.streams.moreover.test; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.moreover.MoreoverJsonActivitySerializer; +import org.apache.streams.moreover.MoreoverTestUtil; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.moreover.MoreoverJsonActivitySerializer; import org.junit.Before; import org.junit.Test; @@ -32,45 +34,48 @@ import java.io.InputStream; import java.io.StringWriter; import java.nio.charset.Charset; -import static org.apache.streams.moreover.MoreoverTestUtil.test; - /** - * Tests ability to serialize moreover json Strings + * Tests ability to serialize moreover json Strings. */ public class MoreoverJsonActivitySerializerIT { - JsonNode json; - ActivitySerializer serializer = new MoreoverJsonActivitySerializer(); - ObjectMapper mapper; - @Before - public void setup() throws Exception { + JsonNode json; + ActivitySerializer serializer = new MoreoverJsonActivitySerializer(); + ObjectMapper mapper; - StringWriter writer = new StringWriter(); - InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json"); - IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); + /** + * Before. + * @throws Exception Exception + */ + @Before + public void setup() throws Exception { - mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + StringWriter writer = new StringWriter(); + InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json"); + IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); - json = mapper.readValue(writer.toString(), JsonNode.class); - } + mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + + json = mapper.readValue(writer.toString(), JsonNode.class); + } - @Test - public void loadData() throws Exception { - for (JsonNode item : json) { - test(serializer.deserialize(getString(item))); - } + @Test + public void loadData() throws Exception { + for (JsonNode item : json) { + MoreoverTestUtil.validate(serializer.deserialize(getString(item))); } + } - private String getString(JsonNode jsonNode) { - try { - return new ObjectMapper().writeValueAsString(jsonNode); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + private String getString(JsonNode jsonNode) { + try { + return new ObjectMapper().writeValueAsString(jsonNode); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java index ad0b384..2d93656 100644 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java @@ -18,10 +18,13 @@ package org.apache.streams.moreover.test; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.moreover.MoreoverTestUtil; +import org.apache.streams.moreover.MoreoverXmlActivitySerializer; import org.apache.streams.pojo.json.Activity; + +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Test; @@ -31,36 +34,32 @@ import java.io.StringWriter; import java.nio.charset.Charset; import java.util.List; -import org.apache.streams.moreover.MoreoverXmlActivitySerializer; - -import static org.apache.streams.moreover.MoreoverTestUtil.test; - /** * Tests ability to serialize moreover xml Strings */ public class MoreoverXmlActivitySerializerIT { - ActivitySerializer serializer; - private String xml; + ActivitySerializer serializer; + private String xml; - @Before - public void setup() throws IOException { - serializer = new MoreoverXmlActivitySerializer(); - xml = loadXml(); - } + @Before + public void setup() throws IOException { + serializer = new MoreoverXmlActivitySerializer(); + xml = loadXml(); + } - @Test - public void loadData() throws Exception { - List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml)); - for (Activity activity : activities) { - test(activity); - } + @Test + public void loadData() throws Exception { + List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml)); + for (Activity activity : activities) { + MoreoverTestUtil.validate(activity); } + } - private String loadXml() throws IOException { - StringWriter writer = new StringWriter(); - InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml"); - IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); - return writer.toString(); - } + private String loadXml() throws IOException { + StringWriter writer = new StringWriter(); + InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml"); + IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); + return writer.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java index 2bc672d..f5b61bf 100644 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java @@ -18,10 +18,11 @@ package org.apache.streams.moreover.test.provider; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.moreover.MoreoverProvider; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import org.apache.streams.moreover.MoreoverProvider; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -32,36 +33,34 @@ import java.io.FileReader; import java.io.LineNumberReader; /** - * Integration test for MoreoverProviderIT - * - * Created by sblackmon on 10/21/16. + * Integration test for MoreoverProviderIT. */ @Ignore("this is ignored because the project doesn't have credentials to test it with during CI") public class MoreoverProviderIT { - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MoreoverProviderIT.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Test - public void testRssStreamProvider() throws Exception { + @Test + public void testRssStreamProvider() throws Exception { - String configfile = "./target/test-classes/RssStreamProviderIT.conf"; - String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; + String configfile = "./target/test-classes/RssStreamProviderIT.conf"; + String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; - MoreoverProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + MoreoverProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 1); + assert (outCounter.getLineNumber() >= 1); - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java index 1df1ff9..d3c763a 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java @@ -18,13 +18,14 @@ package org.apache.streams.rss.processor; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; @@ -35,45 +36,45 @@ import java.util.List; /** * Converts ObjectNode representations of Rome SyndEntries to activities. */ -public class RssTypeConverter implements StreamsProcessor{ - - public final static String STREAMS_ID = "RssTypeConverter"; +public class RssTypeConverter implements StreamsProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class); + public static final String STREAMS_ID = "RssTypeConverter"; - private SyndEntryActivitySerializer serializer; - private int successCount = 0; - private int failCount = 0; + private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class); - @Override - public String getId() { - return STREAMS_ID; - } + private SyndEntryActivitySerializer serializer; + private int successCount = 0; + private int failCount = 0; - @Override - public List<StreamsDatum> process(StreamsDatum datum) { - List<StreamsDatum> datums = Lists.newLinkedList(); - if(datum.getDocument() instanceof ObjectNode) { - Activity activity = this.serializer.deserialize((ObjectNode) datum.getDocument()); - datums.add(new StreamsDatum(activity, activity.getId(), DateTime.now().withZone(DateTimeZone.UTC))); - successCount ++; - } else { - failCount ++; - throw new NotImplementedException("Not implemented for class type : "+ datum.getDocument().getClass().toString()); + @Override + public String getId() { + return STREAMS_ID; + } - } - LOGGER.debug("Processor current success count: {} and current fail: {}", successCount, failCount); + @Override + public List<StreamsDatum> process(StreamsDatum datum) { + List<StreamsDatum> datums = Lists.newLinkedList(); + if (datum.getDocument() instanceof ObjectNode) { + Activity activity = this.serializer.deserialize((ObjectNode) datum.getDocument()); + datums.add(new StreamsDatum(activity, activity.getId(), DateTime.now().withZone(DateTimeZone.UTC))); + successCount ++; + } else { + failCount ++; + throw new NotImplementedException("Not implemented for class type : " + datum.getDocument().getClass().toString()); - return datums; } + LOGGER.debug("Processor current success count: {} and current fail: {}", successCount, failCount); - @Override - public void prepare(Object o) { - this.serializer = new SyndEntryActivitySerializer(); - } + return datums; + } - @Override - public void cleanUp() { + @Override + public void prepare(Object configurationObject) { + this.serializer = new SyndEntryActivitySerializer(); + } - } + @Override + public void cleanUp() { + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java deleted file mode 100644 index 4e6efee..0000000 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.rss.provider; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.sun.syndication.feed.synd.SyndEntry; - -/** - * Created by sblackmon on 12/13/13. - */ -public class RssEventClassifier { - - public static Class detectClass( ObjectNode bean ) { - return SyndEntry.class; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java index 75d275d..078356c 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java @@ -18,86 +18,100 @@ package org.apache.streams.rss.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.sun.syndication.feed.synd.SyndEntry; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.json.Activity; import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; import org.apache.streams.rss.serializer.SyndEntrySerializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.syndication.feed.synd.SyndEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Queue; import java.util.Random; +/** + * RssEventProcessor processes Rss Events. + */ public class RssEventProcessor implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(RssEventProcessor.class); - - private ObjectMapper mapper = new ObjectMapper(); - - private Queue<SyndEntry> inQueue; - private Queue<StreamsDatum> outQueue; - - private Class inClass; - private Class outClass; - - private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer(); - private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer(); - - public final static String TERMINATE = new String("TERMINATE"); - - public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) { - this.inQueue = inQueue; - this.outQueue = outQueue; - this.inClass = inClass; - this.outClass = outClass; - } - - public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class outClass) { - this.inQueue = inQueue; - this.outQueue = outQueue; - this.outClass = outClass; - } + private static final Logger LOGGER = LoggerFactory.getLogger(RssEventProcessor.class); + + private ObjectMapper mapper = new ObjectMapper(); + + private Queue<SyndEntry> inQueue; + private Queue<StreamsDatum> outQueue; + + private Class inClass; + private Class outClass; + + private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer(); + private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer(); + + public static final String TERMINATE = new String("TERMINATE"); + + /** + * RssEventProcessor constructor. + * @param inQueue inQueue + * @param outQueue outQueue + * @param inClass inClass + * @param outClass outClass + */ + public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.inClass = inClass; + this.outClass = outClass; + } + + /** + * RssEventProcessor constructor. + * @param inQueue inQueue + * @param outQueue outQueue + * @param outClass outClass + */ + public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.outClass = outClass; + } + + @Override + public void run() { + + while (true) { + Object item; + try { + item = inQueue.poll(); + if (item instanceof String && item.equals(TERMINATE)) { + LOGGER.info("Terminating!"); + break; + } - @Override - public void run() { - - while(true) { - Object item; - try { - item = inQueue.poll(); - if(item instanceof String && item.equals(TERMINATE)) { - LOGGER.info("Terminating!"); - break; - } - - Thread.sleep(new Random().nextInt(100)); - - // if the target is string, just pass-through - if( String.class.equals(outClass)) - outQueue.offer(new StreamsDatum(item.toString())); - else if( SyndEntry.class.equals(outClass)) - { - outQueue.offer(new StreamsDatum(item)); - } - else if( Activity.class.equals(outClass)) - { - // convert to desired format - SyndEntry entry = (SyndEntry)item; - if( entry != null ) { - Activity out = syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item)); - - if( out != null ) - outQueue.offer(new StreamsDatum(out)); - } - } - - } catch (Exception e) { - e.printStackTrace(); + Thread.sleep(new Random().nextInt(100)); + + // if the target is string, just pass-through + if ( String.class.equals(outClass)) { + outQueue.offer(new StreamsDatum(item.toString())); + } else if ( SyndEntry.class.equals(outClass)) { + outQueue.offer(new StreamsDatum(item)); + } else if ( Activity.class.equals(outClass)) { + // convert to desired format + SyndEntry entry = (SyndEntry)item; + if ( entry != null ) { + Activity out = syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item)); + + if ( out != null ) { + outQueue.offer(new StreamsDatum(out)); } + } } + + } catch (Exception ex) { + ex.printStackTrace(); + } } + } -}; +}