http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java new file mode 100644 index 0000000..34ac8c4 --- /dev/null +++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.google.common.collect.Lists; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.converter.ActivityConverterProcessor; +import org.apache.streams.converter.ActivityConverterProcessorConfiguration; +import org.apache.streams.converter.TypeConverterProcessor; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.data.ActivityConverter; +import org.apache.streams.data.DocumentClassifier; +import org.apache.streams.example.TwitterFollowNeo4jConfiguration; +import org.apache.streams.graph.GraphHttpConfiguration; +import org.apache.streams.graph.GraphHttpPersistWriter; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.twitter.TwitterFollowingConfiguration; +import org.apache.streams.twitter.converter.TwitterFollowActivityConverter; +import org.apache.streams.twitter.provider.TwitterFollowingProvider; +import org.apache.streams.twitter.converter.TwitterDocumentClassifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collects friend and follow connections for a set of twitter users and builds a graph + * database in neo4j. + */ +public class TwitterFollowNeo4j implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class); + + TwitterFollowNeo4jConfiguration config; + + public TwitterFollowNeo4j() { + this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + } + + public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) { + this.config = config; + } + + public void run() { + + TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter(); + TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration); + TypeConverterProcessor converter = new TypeConverterProcessor(String.class); + + ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration = + new ActivityConverterProcessorConfiguration() + .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier())) + .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter())); + ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration); + + GraphHttpConfiguration graphWriterConfiguration = config.getGraph(); + GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration); + + StreamBuilder builder = new LocalStreamBuilder(); + builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider); + builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID); + builder.addStreamsProcessor("activity", activity, 1, "converter"); + builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity"); + + builder.start(); + } + + public static void main(String[] args) { + + LOGGER.info(StreamsConfigurator.config.toString()); + + TwitterFollowNeo4j stream = new TwitterFollowNeo4j(); + + stream.run(); + + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json new file mode 100644 index 0000000..ffbd39d --- /dev/null +++ b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true }, + "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot new file mode 100644 index 0000000..2d9e495 --- /dev/null +++ b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //providers + TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"]; + + //processors + TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"]; + ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"]; + + //persisters + GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"]; + + //data + destination [label="http://{host}:{port}/db/data",shape=box]; + + //stream + TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"]; + TypeConverterProcessor -> ActivityConverterProcessor [label="String"]; + ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"]; + GraphPersistWriter -> destination +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md new file mode 100644 index 0000000..936efb4 --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md @@ -0,0 +1,33 @@ +### TwitterFollowNeo4j + +#### Description: + +Collects friend or follower connections for a set of twitter users to build a graph database in neo4j. + +#### Configuration: + +[TwitterFollowNeo4jIT.conf](TwitterFollowNeo4jIT.conf "TwitterFollowNeo4jIT.conf" ) + +#### Run (SBT): + + sbtx -210 -sbt-create + set resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" + set libraryDependencies += "org.apache.streams" % "twitter-follow-neo4j" % "0.4-incubating-SNAPSHOT" + set fork := true + set javaOptions +="-Dconfig.file=application.conf" + run org.apache.streams.example.graph.TwitterFollowNeo4j + +#### Run (Docker): + + docker run apachestreams/twitter-follow-neo4j java -cp twitter-follow-neo4j-jar-with-dependencies.jar org.apache.streams.example.TwitterFollowNeo4j + +#### Specification: + +[TwitterFollowNeo4j.dot](TwitterFollowNeo4j.dot "TwitterFollowNeo4j.dot" ) + +#### Diagram: + +![TwitterFollowNeo4j.dot.svg](./TwitterFollowNeo4j.dot.svg) + + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/markdown/index.md b/local/twitter-follow-neo4j/src/site/markdown/index.md new file mode 100644 index 0000000..3efdc5b --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/markdown/index.md @@ -0,0 +1,42 @@ +### twitter-follow-neo4j + +#### Requirements: + - Authorized Twitter API credentials + - A running Neo4J 3.0.0+ instance + +#### Streams: + +<a href="TwitterFollowNeo4j.html" target="_self">TwitterFollowNeo4j</a> + +#### Build: + + mvn clean package verify + +#### Test: + +Create a local file `application.conf` with valid twitter credentials + + twitter { + oauth { + consumerKey = "" + consumerSecret = "" + accessToken = "" + accessTokenSecret = "" + } + } + +Start up neo4j with docker: + + mvn -PdockerITs docker:start + +Build with integration testing enabled, using your credentials + + mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf" + +Shutdown neo4j when finished: + + mvn -PdockerITs docker:stop + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot new file mode 100644 index 0000000..2d9e495 --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //providers + TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"]; + + //processors + TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"]; + ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"]; + + //persisters + GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"]; + + //data + destination [label="http://{host}:{port}/db/data",shape=box]; + + //stream + TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"]; + TypeConverterProcessor -> ActivityConverterProcessor [label="String"]; + ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"]; + GraphPersistWriter -> destination +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json new file mode 100644 index 0000000..6025640 --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.example.graph.TwitterFollowNeo4jConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true }, + "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot new file mode 100644 index 0000000..2d9e495 --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //providers + TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"]; + + //processors + TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"]; + ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"]; + + //persisters + GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"]; + + //data + destination [label="http://{host}:{port}/db/data",shape=box]; + + //stream + TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"]; + TypeConverterProcessor -> ActivityConverterProcessor [label="String"]; + ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"]; + GraphPersistWriter -> destination +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json new file mode 100644 index 0000000..ffbd39d --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true }, + "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/site.xml ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/site/site.xml b/local/twitter-follow-neo4j/src/site/site.xml new file mode 100644 index 0000000..a25bae0 --- /dev/null +++ b/local/twitter-follow-neo4j/src/site/site.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project> + <custom> + <fluidoSkin> + <topBarEnabled>false</topBarEnabled> + <navBarStyle>navbar-inverse</navBarStyle> + <sideBarEnabled>true</sideBarEnabled> + <!--<gitHub>--> + <!--<projectId>apache/incubator-streams-examples</projectId>--> + <!--<ribbonOrientation>right</ribbonOrientation>--> + <!--<ribbonColor>black</ribbonColor>--> + <!--</gitHub>--> + <!--<twitter>--> + <!--<user>ApacheStreams</user>--> + <!--<showUser>true</showUser>--> + <!--<showFollowers>true</showFollowers>--> + <!--</twitter>--> + </fluidoSkin> + </custom> + <body> + <menu name="Configuration"> + <item name="Neo4j" href="../../services/neo4j.html"/> + </menu> + <menu name="Credentials"> + <item name="Twitter" href="../../credentials/twitter.html"/> + </menu> + </body> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java new file mode 100644 index 0000000..2813b08 --- /dev/null +++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.example.TwitterFollowNeo4j; +import org.apache.streams.example.TwitterFollowNeo4jConfiguration; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +/** + * Example stream that populates elasticsearch with activities from twitter userstream in real-time + */ +public class TwitterFollowNeo4jIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class); + + protected TwitterFollowNeo4jConfiguration testConfiguration; + + private int count = 0; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterFollowGraphIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties graph_properties = new Properties(); + InputStream graph_stream = new FileInputStream("neo4j.properties"); + graph_properties.load(graph_stream); + Config graphProps = ConfigFactory.parseProperties(graph_properties); + Config typesafe = testResourceConfig.withFallback(graphProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe); + + } + + @Test + public void testTwitterFollowGraph() throws Exception { + + TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration); + + stream.run(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf new file mode 100644 index 0000000..d4b4aeb --- /dev/null +++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +twitter { + endpoint = "friends" + info = [ + 18055613 + ] + twitter.max_items = 1000 +} +graph { + hostname = ${neo4j.http.host} + port = ${neo4j.http.port} + type = "neo4j" + graph = "data" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java new file mode 100644 index 0000000..7d87f36 --- /dev/null +++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.converter.ActivityConverterProcessor; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.twitter.provider.TwitterTimelineProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieves as many posts from a known list of users as twitter API allows. + * + * Converts them to activities, and writes them in activity format to Elasticsearch. + */ + +public class TwitterHistoryElasticsearch implements Runnable { + + public final static String STREAMS_ID = "TwitterHistoryElasticsearch"; + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class); + + private static final ObjectMapper mapper = new ObjectMapper(); + + TwitterHistoryElasticsearchConfiguration config; + + public TwitterHistoryElasticsearch() { + this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + + } + + public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) { + this.config = config; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch(); + + new Thread(history).start(); + + } + + + public void run() { + + TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter()); + ActivityConverterProcessor converter = new ActivityConverterProcessor(); + ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch()); + + StreamBuilder builder = new LocalStreamBuilder(500); + + builder.newPerpetualStream("provider", provider); + builder.addStreamsProcessor("converter", converter, 2, "provider"); + builder.addStreamsPersistWriter("writer", writer, 1, "converter"); + builder.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java deleted file mode 100644 index 090b9ed..0000000 --- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.twitter; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.converter.ActivityConverterProcessor; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.elasticsearch.ElasticsearchConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.TwitterStreamConfiguration; -import org.apache.streams.twitter.TwitterUserInformationConfiguration; -import org.apache.streams.twitter.processor.TwitterTypeConverter; -import org.apache.streams.twitter.provider.TwitterConfigurator; -import org.apache.streams.twitter.provider.TwitterTimelineProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Retrieves as many posts from a known list of users as twitter API allows. - * - * Converts them to activities, and writes them in activity format to Elasticsearch. - */ - -public class TwitterHistoryElasticsearch implements Runnable { - - public final static String STREAMS_ID = "TwitterHistoryElasticsearch"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class); - - private static final ObjectMapper mapper = new ObjectMapper(); - - TwitterHistoryElasticsearchConfiguration config; - - public TwitterHistoryElasticsearch() { - this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - - } - - public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) { - this.config = config; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch(); - - new Thread(history).start(); - - } - - - public void run() { - - TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter()); - ActivityConverterProcessor converter = new ActivityConverterProcessor(); - ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch()); - - StreamBuilder builder = new LocalStreamBuilder(500); - - builder.newPerpetualStream("provider", provider); - builder.addStreamsProcessor("converter", converter, 2, "provider"); - builder.addStreamsPersistWriter("writer", writer, 1, "converter"); - builder.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json index ea9b165..eaf8028 100644 --- a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json +++ b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json @@ -4,7 +4,7 @@ "http://www.apache.org/licenses/LICENSE-2.0" ], "type": "object", - "javaType" : "org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration", + "javaType" : "org.apache.streams.example.TwitterHistoryElasticsearchConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { "twitter": { "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration", "type": "object", "required": true }, http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java new file mode 100644 index 0000000..b0c9155 --- /dev/null +++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.example.TwitterHistoryElasticsearch; +import org.apache.streams.example.TwitterHistoryElasticsearchConfiguration; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +/** + * Example stream that populates elasticsearch with activities from twitter userstream in real-time + */ +public class TwitterHistoryElasticsearchIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class); + + protected TwitterHistoryElasticsearchConfiguration testConfiguration; + protected Client testClient; + + private int count = 0; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe); + testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertFalse(indicesExistsResponse.isExists()); + + } + + @Test + public void testTwitterHistoryElasticsearch() throws Exception { + + TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration); + + stream.run(); + + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getElasticsearch().getIndex()) + .setTypes(testConfiguration.getElasticsearch().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + count = (int)countResponse.getHits().getTotalHits(); + + assertNotEquals(count, 0); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java deleted file mode 100644 index b8e1b64..0000000 --- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.example; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.example.twitter.TwitterHistoryElasticsearch; -import org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; - -/** - * Example stream that populates elasticsearch with activities from twitter userstream in real-time - */ -public class TwitterHistoryElasticsearchIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class); - - protected TwitterHistoryElasticsearchConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertFalse(indicesExistsResponse.isExists()); - - } - - @Test - public void testTwitterHistoryElasticsearch() throws Exception { - - TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration); - - stream.run(); - - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getElasticsearch().getIndex()) - .setTypes(testConfiguration.getElasticsearch().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - assertNotEquals(count, 0); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java new file mode 100644 index 0000000..f1e776a --- /dev/null +++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.converter.ActivityConverterProcessor; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration; +import org.apache.streams.filters.VerbDefinitionDropFilter; +import org.apache.streams.filters.VerbDefinitionKeepFilter; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.provider.TwitterStreamProvider; +import org.apache.streams.verbs.ObjectCombination; +import org.apache.streams.verbs.VerbDefinition; +import org.elasticsearch.common.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Example stream that populates elasticsearch with activities from twitter userstream in real-time + */ +public class TwitterUserstreamElasticsearch implements Runnable { + + public final static String STREAMS_ID = "TwitterUserstreamElasticsearch"; + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class); + + /* this pattern will match any/only deletes */ + private static VerbDefinition deleteVerbDefinition = + new VerbDefinition() + .withValue("delete") + .withObjects(Lists.newArrayList(new ObjectCombination())); + + TwitterUserstreamElasticsearchConfiguration config; + + public TwitterUserstreamElasticsearch() { + this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + + } + + public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) { + this.config = config; + } + + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); + + TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch(); + new Thread(userstream).start(); + + } + + @Override + public void run() { + + TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter(); + ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch(); + + TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration); + ActivityConverterProcessor converter = new ActivityConverterProcessor(); + VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition)); + ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration); + VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition)); + SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor(); + ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000); + StreamBuilder builder = new LocalStreamBuilder(25, streamConfig); + + builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream); + builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID); + builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter"); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor"); + builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter"); + builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor"); + builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor"); + + builder.start(); + + } + + protected class SetDeleteIdProcessor implements StreamsProcessor { + + public String getId() { + return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor"; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + Preconditions.checkArgument(entry.getDocument() instanceof Activity); + String id = entry.getId(); + // replace delete with post in id + // ensure ElasticsearchPersistDeleter will remove original post if present + id = Strings.replace(id, "delete", "post"); + entry.setId(id); + + return Lists.newArrayList(entry); + } + + @Override + public void prepare(Object configurationObject) { + + + } + + @Override + public void cleanUp() { + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java deleted file mode 100644 index c483742..0000000 --- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.example; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.converter.ActivityConverterProcessor; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; -import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration; -import org.apache.streams.filters.VerbDefinitionDropFilter; -import org.apache.streams.filters.VerbDefinitionKeepFilter; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.TwitterStreamConfiguration; -import org.apache.streams.twitter.provider.TwitterStreamProvider; -import org.apache.streams.verbs.ObjectCombination; -import org.apache.streams.verbs.VerbDefinition; -import org.elasticsearch.common.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * Example stream that populates elasticsearch with activities from twitter userstream in real-time - */ -public class TwitterUserstreamElasticsearch implements Runnable { - - public final static String STREAMS_ID = "TwitterUserstreamElasticsearch"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class); - - /* this pattern will match any/only deletes */ - private static VerbDefinition deleteVerbDefinition = - new VerbDefinition() - .withValue("delete") - .withObjects(Lists.newArrayList(new ObjectCombination())); - - TwitterUserstreamElasticsearchConfiguration config; - - public TwitterUserstreamElasticsearch() { - this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - - } - - public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) { - this.config = config; - } - - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); - - TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch(); - new Thread(userstream).start(); - - } - - @Override - public void run() { - - TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter(); - ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch(); - - TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration); - ActivityConverterProcessor converter = new ActivityConverterProcessor(); - VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition)); - ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration); - VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition)); - SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor(); - ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration); - - Map<String, Object> streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000); - StreamBuilder builder = new LocalStreamBuilder(25, streamConfig); - - builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream); - builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID); - builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter"); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor"); - builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter"); - builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor"); - builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor"); - - builder.start(); - - } - - protected class SetDeleteIdProcessor implements StreamsProcessor { - - public String getId() { - return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor"; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - Preconditions.checkArgument(entry.getDocument() instanceof Activity); - String id = entry.getId(); - // replace delete with post in id - // ensure ElasticsearchPersistDeleter will remove original post if present - id = Strings.replace(id, "delete", "post"); - entry.setId(id); - - return Lists.newArrayList(entry); - } - - @Override - public void prepare(Object configurationObject) { - - - } - - @Override - public void cleanUp() { - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json index 6a25850..7261439 100644 --- a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json +++ b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json @@ -4,7 +4,7 @@ "http://www.apache.org/licenses/LICENSE-2.0" ], "type": "object", - "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration", + "javaType" : "org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { "twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", "required": true }, http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java new file mode 100644 index 0000000..7ba9940 --- /dev/null +++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.example.test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration; +import org.apache.streams.example.TwitterUserstreamElasticsearch; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +/** + * Test copying documents between two indexes on same cluster + */ +public class TwitterUserstreamElasticsearchIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class); + + protected TwitterUserstreamElasticsearchConfiguration testConfiguration; + protected Client testClient; + + private int count = 0; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties es_properties = new Properties(); + InputStream es_stream = new FileInputStream("elasticsearch.properties"); + es_properties.load(es_stream); + Config esProps = ConfigFactory.parseProperties(es_properties); + Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe); + testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertFalse(indicesExistsResponse.isExists()); + + } + + @Test + public void testReindex() throws Exception { + + TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration); + + stream.run(); + + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getElasticsearch().getIndex()) + .setTypes(testConfiguration.getElasticsearch().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + count = (int)countResponse.getHits().getTotalHits(); + + assertNotEquals(count, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java deleted file mode 100644 index 2f524f0..0000000 --- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.twitter.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.example.TwitterUserstreamElasticsearch; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; - -/** - * Test copying documents between two indexes on same cluster - */ -public class TwitterUserstreamElasticsearchIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class); - - protected TwitterUserstreamElasticsearchConfiguration testConfiguration; - protected Client testClient; - - private int count = 0; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Properties es_properties = new Properties(); - InputStream es_stream = new FileInputStream("elasticsearch.properties"); - es_properties.load(es_stream); - Config esProps = ConfigFactory.parseProperties(es_properties); - Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertFalse(indicesExistsResponse.isExists()); - - } - - @Test - public void testReindex() throws Exception { - - TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration); - - stream.run(); - - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getElasticsearch().getIndex()) - .setTypes(testConfiguration.getElasticsearch().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); - - count = (int)countResponse.getHits().getTotalHits(); - - assertNotEquals(count, 0); - } -}