http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java deleted file mode 100644 index 9801c30..0000000 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.rss.serializer; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.util.RFC3339Utils; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.*; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class SyndEntryActivityConverter implements ActivityConverter<ObjectNode> { - - private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivityConverter.class); - - private boolean includeRomeExtension; - - public SyndEntryActivityConverter() { - this(true); - } - - public SyndEntryActivityConverter(boolean includeRomeExtension) { - this.includeRomeExtension = includeRomeExtension; - } - - - @Override - public List<Activity> deserializeAll(List<ObjectNode> objectNodes) { - List<Activity> result = Lists.newLinkedList(); - for (ObjectNode node : objectNodes) { - result.add(deserialize(node)); - } - return result; - } - - @Override - public String serializationFormat() { - return "application/streams-provider-rss"; - } - - @Override - public ObjectNode serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Rome"); - } - - @Override - public Activity deserialize(ObjectNode syndEntry) { - return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension); - } - - public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) { - Preconditions.checkNotNull(entry); - - Activity activity = new Activity(); - Provider provider = buildProvider(entry); - Actor actor = buildActor(entry); - ActivityObject activityObject = buildActivityObject(entry); - - activityObject.setUrl(provider.getUrl()); - activityObject.setAuthor(actor.getAuthor()); - - activity.setUrl(provider.getUrl()); - activity.setProvider(provider); - activity.setActor(actor); - activity.setVerb("post"); - activity.setId("id:rss:post:" + activity.getUrl()); - - JsonNode published = entry.get("publishedDate"); - if (published != null) { - try { - activity.setPublished(RFC3339Utils.parseToUTC(published.textValue())); - } catch (Exception e) { - LOGGER.warn("Failed to parse date : {}", published.textValue()); - - DateTime now = DateTime.now().withZone(DateTimeZone.UTC); - activity.setPublished(now); - } - } - - activity.setUpdated(activityObject.getUpdated()); - activity.setObject(activityObject); - - if (withExtension) { - activity = addRomeExtension(activity, entry); - } - - return activity; - } - - /** - * Given an RSS entry, extra out the author and actor information and return it - * in an actor object - * - * @param entry - * @return - */ - private Actor buildActor(ObjectNode entry) { - Author author = new Author(); - Actor actor = new Actor(); - - if (entry.get("author") != null) { - author.setId(entry.get("author").textValue()); - author.setDisplayName(entry.get("author").textValue()); - - actor.setAuthor(author); - String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null; - - actor.setId("id:rss:" + uriToSet + ":" + author.getId()); - actor.setDisplayName(author.getDisplayName()); - } - - return actor; - } - - /** - * Given an RSS object, build the ActivityObject - * - * @param entry - * @return - */ - private ActivityObject buildActivityObject(ObjectNode entry) { - ActivityObject activityObject = new ActivityObject(); - - JsonNode summary = entry.get("description"); - if (summary != null) - activityObject.setSummary(summary.textValue()); - else if((summary = entry.get("title")) != null) { - activityObject.setSummary(summary.textValue()); - } - - return activityObject; - } - - /** - * Given an RSS object, build and return the Provider object - * - * @param entry - * @return - */ - private Provider buildProvider(ObjectNode entry) { - Provider provider = new Provider(); - - String link = null; - String uri = null; - String resourceLocation = null; - - if (entry.get("link") != null) - link = entry.get("link").textValue(); - if (entry.get("uri") != null) - uri = entry.get("uri").textValue(); - - /** - * Order of precedence for resourceLocation selection - * - * 1. Valid URI - * 2. Valid Link - * 3. Non-null URI - * 4. Non-null Link - */ - if(isValidResource(uri)) - resourceLocation = uri; - else if(isValidResource(link)) - resourceLocation = link; - else if(uri != null || link != null) { - resourceLocation = (uri != null) ? uri : link; - } - - provider.setId("id:providers:rss"); - provider.setUrl(resourceLocation); - provider.setDisplayName("RSS"); - - return provider; - } - - /** - * Tests whether or not the passed in resource is a valid URI - * @param resource - * @return boolean of whether or not the resource is valid - */ - private boolean isValidResource(String resource) { - if(resource != null && resource.startsWith("http") || resource.startsWith("www")) - return true; - return false; - } - - /** - * Given an RSS object and an existing activity, - * add the Rome extension to that activity and return it - * - * @param activity - * @param entry - * @return - */ - private Activity addRomeExtension(Activity activity, ObjectNode entry) { - ObjectMapper mapper = new StreamsJacksonMapper(); - ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class); - ObjectNode extensions = JsonNodeFactory.instance.objectNode(); - - extensions.put("rome", entry); - activityRoot.put("extensions", extensions); - - activity = mapper.convertValue(activityRoot, Activity.class); - - return activity; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java new file mode 100644 index 0000000..06839f3 --- /dev/null +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.rss.serializer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.*; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; + +public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class); + + private boolean includeRomeExtension; + + public SyndEntryActivitySerializer() { + this(true); + } + + public SyndEntryActivitySerializer(boolean includeRomeExtension) { + this.includeRomeExtension = includeRomeExtension; + } + + + @Override + public List<Activity> deserializeAll(List<ObjectNode> objectNodes) { + List<Activity> result = Lists.newLinkedList(); + for (ObjectNode node : objectNodes) { + result.add(deserialize(node)); + } + return result; + } + + @Override + public String serializationFormat() { + return "application/streams-provider-rss"; + } + + @Override + public ObjectNode serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Rome"); + } + + @Override + public Activity deserialize(ObjectNode syndEntry) { + return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension); + } + + public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) { + Preconditions.checkNotNull(entry); + + Activity activity = new Activity(); + Provider provider = buildProvider(entry); + Actor actor = buildActor(entry); + ActivityObject activityObject = buildActivityObject(entry); + + activityObject.setUrl(provider.getUrl()); + activityObject.setAuthor(actor.getAuthor()); + + activity.setUrl(provider.getUrl()); + activity.setProvider(provider); + activity.setActor(actor); + activity.setVerb("post"); + activity.setId("id:rss:post:" + activity.getUrl()); + + JsonNode published = entry.get("publishedDate"); + if (published != null) { + try { + activity.setPublished(RFC3339Utils.parseToUTC(published.textValue())); + } catch (Exception e) { + LOGGER.warn("Failed to parse date : {}", published.textValue()); + + DateTime now = DateTime.now().withZone(DateTimeZone.UTC); + activity.setPublished(now); + } + } + + activity.setUpdated(activityObject.getUpdated()); + activity.setObject(activityObject); + + if (withExtension) { + activity = addRomeExtension(activity, entry); + } + + return activity; + } + + /** + * Given an RSS entry, extra out the author and actor information and return it + * in an actor object + * + * @param entry + * @return + */ + private Actor buildActor(ObjectNode entry) { + Author author = new Author(); + Actor actor = new Actor(); + + if (entry.get("author") != null) { + author.setId(entry.get("author").textValue()); + author.setDisplayName(entry.get("author").textValue()); + + actor.setAuthor(author); + String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null; + + actor.setId("id:rss:" + uriToSet + ":" + author.getId()); + actor.setDisplayName(author.getDisplayName()); + } + + return actor; + } + + /** + * Given an RSS object, build the ActivityObject + * + * @param entry + * @return + */ + private ActivityObject buildActivityObject(ObjectNode entry) { + ActivityObject activityObject = new ActivityObject(); + + JsonNode summary = entry.get("description"); + if (summary != null) + activityObject.setSummary(summary.textValue()); + else if((summary = entry.get("title")) != null) { + activityObject.setSummary(summary.textValue()); + } + + return activityObject; + } + + /** + * Given an RSS object, build and return the Provider object + * + * @param entry + * @return + */ + private Provider buildProvider(ObjectNode entry) { + Provider provider = new Provider(); + + String link = null; + String uri = null; + String resourceLocation = null; + + if (entry.get("link") != null) + link = entry.get("link").textValue(); + if (entry.get("uri") != null) + uri = entry.get("uri").textValue(); + + /** + * Order of precedence for resourceLocation selection + * + * 1. Valid URI + * 2. Valid Link + * 3. Non-null URI + * 4. Non-null Link + */ + if(isValidResource(uri)) + resourceLocation = uri; + else if(isValidResource(link)) + resourceLocation = link; + else if(uri != null || link != null) { + resourceLocation = (uri != null) ? uri : link; + } + + provider.setId("id:providers:rss"); + provider.setUrl(resourceLocation); + provider.setDisplayName("RSS"); + + return provider; + } + + /** + * Tests whether or not the passed in resource is a valid URI + * @param resource + * @return boolean of whether or not the resource is valid + */ + private boolean isValidResource(String resource) { + if(resource != null && resource.startsWith("http") || resource.startsWith("www")) + return true; + return false; + } + + /** + * Given an RSS object and an existing activity, + * add the Rome extension to that activity and return it + * + * @param activity + * @param entry + * @return + */ + private Activity addRomeExtension(Activity activity, ObjectNode entry) { + ObjectMapper mapper = new StreamsJacksonMapper(); + ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class); + ObjectNode extensions = JsonNodeFactory.instance.objectNode(); + + extensions.put("rome", entry); + activityRoot.put("extensions", extensions); + + activity = mapper.convertValue(activityRoot, Activity.class); + + return activity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java index 446f998..fd9a996 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java @@ -26,7 +26,7 @@ import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.Actor; import org.apache.streams.pojo.json.Author; import org.apache.streams.pojo.json.Provider; -import org.apache.streams.rss.serializer.SyndEntryActivityConverter; +import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -49,7 +49,7 @@ public class SyndEntryActivitySerizlizerTest { List<Activity> activities = Lists.newLinkedList(); List<ObjectNode> objects = Lists.newLinkedList(); - SyndEntryActivityConverter serializer = new SyndEntryActivityConverter(); + SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer(); while(scanner.hasNext()) { String line = scanner.nextLine(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java index d7bb3cb..90df7fc 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java @@ -19,20 +19,15 @@ package org.apache.streams.sysomos.conversion; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sysomos.xml.BeatApi; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.StringUtils; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; import org.apache.streams.pojo.json.Provider; import org.joda.time.DateTime; -import java.util.List; import java.util.Map; import static org.apache.streams.data.util.ActivityUtil.*; @@ -40,37 +35,10 @@ import static org.apache.streams.data.util.ActivityUtil.*; /** * Converts an instance of a {@link com.sysomos.xml.BeatApi.BeatResponse.Beat} to an {@link org.apache.streams.pojo.json.Activity} */ -public class SysomosBeatActivityConverter implements ActivityConverter<BeatApi.BeatResponse.Beat> { +public class SysomosBeatActivityConverter { public static final String LANGUAGE_KEY = "LANGUAGE"; - @Override - public String serializationFormat() { - return null; - } - - @Override - public BeatApi.BeatResponse.Beat serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(BeatApi.BeatResponse.Beat serialized) throws ActivitySerializerException { - return convert(serialized); - } - - @Override - public List<Activity> deserializeAll(List<BeatApi.BeatResponse.Beat> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( BeatApi.BeatResponse.Beat item : serializedList ) { - try { - Activity activity = deserialize(item); - result.add(activity); - } catch (ActivitySerializerException e) {} - } - return result; - } - public Activity convert(BeatApi.BeatResponse.Beat beat) { Activity converted = new Activity(); converted.setId(beat.getDocid()); @@ -167,4 +135,5 @@ public class SysomosBeatActivityConverter implements ActivityConverter<BeatApi.B return tags; } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosConverterResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosConverterResolver.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosConverterResolver.java deleted file mode 100644 index 733c4c1..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosConverterResolver.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.streams.sysomos.conversion; - -import com.sysomos.xml.BeatApi; -import org.apache.streams.data.ActivityConverterResolver; -import org.apache.streams.exceptions.ActivitySerializerException; - -/** - * Ensures sysomos documents can be converted to Activity - */ -public class SysomosConverterResolver implements ActivityConverterResolver { - - @Override - public Class bestSerializer(Class documentClass) throws ActivitySerializerException { - if( documentClass == BeatApi.BeatResponse.Beat.class ) - return SysomosBeatActivityConverter.class; - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosDocumentClassifier.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosDocumentClassifier.java deleted file mode 100644 index 0db5624..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosDocumentClassifier.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.streams.sysomos.conversion; - -import com.sysomos.xml.BeatApi; -import org.apache.streams.data.DocumentClassifier; - -/** - * Ensures sysomos documents can be converted to Activity - */ -public class SysomosDocumentClassifier implements DocumentClassifier { - @Override - public Class detectClass(Object document) { - if( document instanceof BeatApi.BeatResponse.Beat ) - return BeatApi.BeatResponse.Beat.class; - else return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java index 1a7a546..db9f416 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java @@ -27,12 +27,8 @@ import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter; import java.util.List; -@Deprecated /** * Stream processor that converts Sysomos type to Activity - * Deprecated: Modules and streams should adopt TypeConverterProcessor and ActivityConverterProcessor - * TODO: Create SysomosDocumentClassifier and SysomosConverterResolver - * TODO: Refactor any streams */ public class SysomosTypeConverter implements StreamsProcessor { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 660afbc..3880135 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -50,8 +50,9 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-converters</artifactId> + <artifactId>streams-processor-jackson</artifactId> <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -80,11 +81,7 @@ <artifactId>hbc-core</artifactId> <version>2.1.0</version> </dependency> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - <version>1.2</version> - </dependency> + <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-core</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java index 5ad811d..4ca73df 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java @@ -31,7 +31,7 @@ import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.provider.TwitterConfigurator; -import org.apache.streams.twitter.serializer.TwitterDocumentClassifier; +import org.apache.streams.twitter.provider.TwitterEventClassifier; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +111,7 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { } protected void replace(Activity doc, String json) throws java.io.IOException, ActivitySerializerException { - Class documentSubType = TwitterDocumentClassifier.getInstance().detectClass(json); + Class documentSubType = TwitterEventClassifier.detectClass(json); Object object = mapper.readValue(json, documentSubType); if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java index bffdef0..674eef1 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java @@ -26,7 +26,7 @@ import org.apache.streams.core.StreamsProcessor; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; -import org.apache.streams.twitter.serializer.TwitterDocumentClassifier; +import org.apache.streams.twitter.provider.TwitterEventClassifier; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +90,7 @@ public class TwitterProfileProcessor implements StreamsProcessor, Runnable { item = mapper.writeValueAsString((ObjectNode)entry.getDocument()); } - Class inClass = TwitterDocumentClassifier.getInstance().detectClass(item); + Class inClass = TwitterEventClassifier.detectClass(item); User user; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java new file mode 100644 index 0000000..2234739 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.pojo.*; +import org.apache.streams.twitter.serializer.StreamsTwitterMapper; +import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Created by sblackmon on 12/13/13. + */ +public class TwitterEventClassifier implements Serializable { + + private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); + + public static Class detectClass( String json ) { + Preconditions.checkNotNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); + + ObjectNode objectNode; + try { + objectNode = (ObjectNode) mapper.readTree(json); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) + return Retweet.class; + else if( objectNode.findValue("delete") != null ) + return Delete.class; + else if( objectNode.findValue("friends") != null || + objectNode.findValue("friends_str") != null ) + return FriendList.class; + else if( objectNode.findValue("target_object") != null ) + return UserstreamEvent.class; + else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) + return User.class; + else + return Tweet.class; + } + public static ActivitySerializer bestSerializer( String json ) { + + Preconditions.checkNotNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); + + ObjectNode objectNode; + try { + objectNode = (ObjectNode) mapper.readTree(json); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) + return TwitterJsonRetweetActivitySerializer.getInstance(); + else if( objectNode.findValue("delete") != null ) + return TwitterJsonDeleteActivitySerializer.getInstance(); +// else if( objectNode.findValue("friends") != null || +// objectNode.findValue("friends_str") != null ) +// return FriendList.class; + else if( objectNode.findValue("target_object") != null ) + return TwitterJsonUserstreameventActivitySerializer.getInstance(); + else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) + return TwitterJsonUserActivitySerializer.getInstance(); + else + return TwitterJsonTweetActivitySerializer.getInstance(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 45bd071..bd67765 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -32,6 +31,7 @@ import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import twitter4j.Twitter; import twitter4j.TwitterException; import twitter4j.TwitterFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java index 3b5f763..395bd95 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java @@ -18,13 +18,20 @@ package org.apache.streams.twitter.serializer; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsDateTimeDeserializer; +import org.apache.streams.jackson.StreamsDateTimeSerializer; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.jackson.StreamsJacksonModule; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -32,8 +39,9 @@ import org.joda.time.format.DateTimeFormatter; import java.io.IOException; /** + * Created by sblackmon on 3/27/14. + * * Deprecated: Use StreamsJacksonMapper - * TODO: find another place to put TWITTER_FORMAT and delete this class */ @Deprecated public class StreamsTwitterMapper extends StreamsJacksonMapper { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java deleted file mode 100644 index e1f5b5f..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import org.apache.streams.data.ActivityConverterResolver; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.twitter.pojo.*; - -/** - * Ensures twitter documents can be converted to Activity - */ -public class TwitterConverterResolver implements ActivityConverterResolver { - - public TwitterConverterResolver() { - - } - - private static TwitterConverterResolver instance = new TwitterConverterResolver(); - - public static TwitterConverterResolver getInstance() { - - if( instance == null ) - instance = new TwitterConverterResolver(); - return instance; - - } - - @Override - public Class bestSerializer(Class documentClass) throws ActivitySerializerException { - - if (documentClass == Retweet.class) - return TwitterJsonRetweetActivityConverter.class; - else if (documentClass == Delete.class) - return TwitterJsonDeleteActivityConverter.class; - else if (documentClass == User.class) - return TwitterJsonUserActivityConverter.class; - else if (documentClass == UserstreamEvent.class) - return TwitterJsonUserstreameventActivityConverter.class; - else if (documentClass == FriendList.class) - return null; - else - return TwitterJsonTweetActivityConverter.class; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java deleted file mode 100644 index 3b61b3b..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.streams.data.DocumentClassifier; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.pojo.*; - -import java.io.IOException; - -/** - * Ensures twitter documents can be converted to Activity - */ -public class TwitterDocumentClassifier implements DocumentClassifier { - - public TwitterDocumentClassifier() { - - } - - private static TwitterDocumentClassifier instance; - - public static TwitterDocumentClassifier getInstance() { - - if( instance == null ) - instance = new TwitterDocumentClassifier(); - return instance; - } - - private static ObjectMapper mapper; - - public Class detectClass(Object document) { - - Preconditions.checkNotNull(document); - Preconditions.checkArgument(document instanceof String); - - String json = (String)document; - Preconditions.checkArgument(StringUtils.isNotEmpty(json)); - - mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); - - ObjectNode objectNode; - try { - objectNode = (ObjectNode) mapper.readTree(json); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - - if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) - return Retweet.class; - else if( objectNode.findValue("delete") != null ) - return Delete.class; - else if( objectNode.findValue("friends") != null || - objectNode.findValue("friends_str") != null ) - return FriendList.class; - else if( objectNode.findValue("target_object") != null ) - return UserstreamEvent.class; - else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) - return User.class; - else - return Tweet.class; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java deleted file mode 100644 index 5206b08..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.converter.TypeConverterUtil; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.ActivityConverterFactory; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; - -import java.util.List; -import java.io.Serializable; - -/* - * Now that we have ActivityConverterProcessor, this shouldn't be neededà - */ -@Deprecated -public class TwitterJsonActivityConverter implements ActivityConverter<String>, Serializable -{ - - public TwitterJsonActivityConverter() { - - } - - private static TwitterJsonActivityConverter instance = new TwitterJsonActivityConverter(); - - public static TwitterJsonActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public String serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(String serialized) throws ActivitySerializerException { - - Class documentClass = TwitterDocumentClassifier.getInstance().detectClass(serialized); - - Class converterClass = TwitterConverterResolver.getInstance().bestSerializer(documentClass); - - ActivityConverter converter = ActivityConverterFactory.getInstance(converterClass); - - Object typedObject = TypeConverterUtil.convert(serialized, documentClass); - - Activity activity = converter.deserialize(typedObject); - - if( activity == null ) - throw new ActivitySerializerException("unrecognized type"); - - return activity; - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - throw new NotImplementedException(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java new file mode 100644 index 0000000..d1f0de9 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.serializer; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.*; +import org.apache.streams.twitter.provider.TwitterEventClassifier; + +import java.util.List; +import java.io.Serializable; + +public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable +{ + + public TwitterJsonActivitySerializer() { + + } + + private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer(); + + public static TwitterJsonActivitySerializer getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + + ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized); + Activity activity = serializer.deserialize(serialized); + + if( activity == null ) + throw new ActivitySerializerException("unrecognized type"); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java deleted file mode 100644 index 8d8da28..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Tweet; - -import java.io.Serializable; -import java.util.List; - -import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; - - -/** -* Created with IntelliJ IDEA. -* User: mdelaet -* Date: 9/30/13 -* Time: 9:24 AM -* To change this template use File | Settings | File Templates. -*/ -public class TwitterJsonDeleteActivityConverter implements ActivityConverter<Delete>, Serializable { - - private static TwitterJsonDeleteActivityConverter instance = new TwitterJsonDeleteActivityConverter(); - - public static TwitterJsonDeleteActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Delete serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(Delete serialized) throws ActivitySerializerException { - return null; - } - - @Override - public List<Activity> deserializeAll(List<Delete> serializedList) { - return null; - } - - public Activity convert(Delete delete) throws ActivitySerializerException { - - Activity activity = new Activity(); - updateActivity(delete, activity); - return activity; - } - - public ActivityObject buildTarget(Tweet tweet) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java new file mode 100644 index 0000000..b368f71 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Actor; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Tweet; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; + + +/** +* Created with IntelliJ IDEA. +* User: mdelaet +* Date: 9/30/13 +* Time: 9:24 AM +* To change this template use File | Settings | File Templates. +*/ +public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable { + + private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer(); + + public static TwitterJsonDeleteActivitySerializer getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + return null; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + return null; + } + + public Activity convert(ObjectNode event) throws ActivitySerializerException { + + ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + Delete delete = null; + try { + delete = mapper.treeToValue(event, Delete.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + updateActivity(delete, activity); + return activity; + } + + public ActivityObject buildTarget(Tweet tweet) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java deleted file mode 100644 index 4b64932..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.Retweet; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; - -public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable { - - public TwitterJsonRetweetActivityConverter() { - - } - - private static TwitterJsonRetweetActivityConverter instance = new TwitterJsonRetweetActivityConverter(); - - public static TwitterJsonRetweetActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Retweet serialize(Activity deserialized) throws ActivitySerializerException { - return null; - } - - @Override - public Activity deserialize(Retweet retweet) throws ActivitySerializerException { - - Activity activity = new Activity(); - updateActivity(retweet, activity); - - return activity; - } - - @Override - public List<Activity> deserializeAll(List<Retweet> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( Retweet item : serializedList ) { - try { - Activity activity = deserialize(item); - result.add(activity); - } catch (ActivitySerializerException e) {} - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java new file mode 100644 index 0000000..58cb769 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Retweet; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; + +public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable { + + public TwitterJsonRetweetActivitySerializer() { + + } + + private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer(); + + public static TwitterJsonRetweetActivitySerializer getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + return null; + } + + @Override + public Activity deserialize(String event) throws ActivitySerializerException { + + ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + Retweet retweet = null; + try { + retweet = mapper.readValue(event, Retweet.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + updateActivity(retweet, activity); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java deleted file mode 100644 index 5cd1075..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.Tweet; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; - -public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable { - - private static TwitterJsonTweetActivityConverter instance = new TwitterJsonTweetActivityConverter(); - - public static TwitterJsonTweetActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Tweet serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(Tweet tweet) throws ActivitySerializerException { - - Activity activity = new Activity(); - - updateActivity(tweet, activity); - - return activity; - } - - @Override - public List<Activity> deserializeAll(List<Tweet> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( Tweet item : serializedList ) { - try { - Activity activity = deserialize(item); - result.add(activity); - } catch (ActivitySerializerException e) {} - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java new file mode 100644 index 0000000..e6fc05f --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Tweet; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; + +public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable { + + private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer(); + + public static TwitterJsonTweetActivitySerializer getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + + ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + Tweet tweet = null; + try { + tweet = mapper.readValue(serialized, Tweet.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + + updateActivity(tweet, activity); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivityConverter.java deleted file mode 100644 index 3cb1278..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivityConverter.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.User; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity; - -public class TwitterJsonUserActivityConverter implements ActivityConverter<User>, Serializable { - - public TwitterJsonUserActivityConverter() {} - - private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter(); - - public static TwitterJsonUserActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public User serialize(Activity deserialized) throws ActivitySerializerException { - return null; - } - - @Override - public Activity deserialize(User user) throws ActivitySerializerException { - - Activity activity = new Activity(); - updateActivity(user, activity); - - return activity; - } - - @Override - public List<Activity> deserializeAll(List<User> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( User item : serializedList ) { - try { - Activity activity = deserialize(item); - result.add(activity); - } catch (ActivitySerializerException e) {} - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java new file mode 100644 index 0000000..1bf935c --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.User; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity; + +public class TwitterJsonUserActivitySerializer implements ActivitySerializer<String>, Serializable { + + public TwitterJsonUserActivitySerializer() {} + + private static TwitterJsonUserActivitySerializer instance = new TwitterJsonUserActivitySerializer(); + + public static TwitterJsonUserActivitySerializer getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + return null; + } + + @Override + public Activity deserialize(String event) throws ActivitySerializerException { + + ObjectMapper mapper = new StreamsJacksonMapper(); + User user = null; + try { + user = mapper.readValue(event, User.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + updateActivity(user, activity); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivityConverter.java deleted file mode 100644 index 3da55ad..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivityConverter.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Actor; -import org.apache.streams.twitter.pojo.UserstreamEvent; - -import java.util.List; - -import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; - - -/** -* Created with IntelliJ IDEA. -* User: mdelaet -* Date: 9/30/13 -* Time: 9:24 AM -* To change this template use File | Settings | File Templates. -*/ -public class TwitterJsonUserstreameventActivityConverter implements ActivityConverter<UserstreamEvent> { - - private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter(); - - public static TwitterJsonUserstreameventActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public UserstreamEvent serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(UserstreamEvent serialized) throws ActivitySerializerException { - return null; - } - - @Override - public List<Activity> deserializeAll(List<UserstreamEvent> serializedList) { - return null; - } - - public Activity convert(ObjectNode item) throws ActivitySerializerException { - - ObjectMapper mapper = StreamsTwitterMapper.getInstance(); - UserstreamEvent event = null; - try { - event = mapper.treeToValue(item, UserstreamEvent.class); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - - Activity activity = new Activity(); - activity.setActor(buildActor(event)); - activity.setVerb(detectVerb(event)); - activity.setObject(buildActivityObject(event)); - activity.setId(formatId(activity.getVerb())); - if(Strings.isNullOrEmpty(activity.getId())) - throw new ActivitySerializerException("Unable to determine activity id"); - activity.setProvider(getProvider()); - return activity; - } - - public Actor buildActor(UserstreamEvent event) { - Actor actor = new Actor(); - //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); - return actor; - } - - public ActivityObject buildActivityObject(UserstreamEvent event) { - ActivityObject actObj = new ActivityObject(); - //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); - //actObj.setObjectType("tweet"); - return actObj; - } - - public String detectVerb(UserstreamEvent event) { - return null; - } - - public ActivityObject buildTarget(UserstreamEvent event) { - return null; - } - -}