http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
 
b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
new file mode 100644
index 0000000..34ac8c4
--- /dev/null
+++ 
b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.converter.ActivityConverterProcessorConfiguration;
+import org.apache.streams.converter.TypeConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import org.apache.streams.graph.GraphHttpConfiguration;
+import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
+import org.apache.streams.twitter.provider.TwitterFollowingProvider;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collects friend and follow connections for a set of twitter users and 
builds a graph
+ * database in neo4j.
+ */
+public class TwitterFollowNeo4j implements Runnable {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+
+    TwitterFollowNeo4jConfiguration config;
+
+    public TwitterFollowNeo4j() {
+        this(new 
ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+    }
+
+    public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
+        this.config = config;
+    }
+
+    public void run() {
+
+        TwitterFollowingConfiguration twitterFollowingConfiguration = 
config.getTwitter();
+        TwitterFollowingProvider followingProvider = new 
TwitterFollowingProvider(twitterFollowingConfiguration);
+        TypeConverterProcessor converter = new 
TypeConverterProcessor(String.class);
+
+        ActivityConverterProcessorConfiguration 
activityConverterProcessorConfiguration =
+                new ActivityConverterProcessorConfiguration()
+                        
.withClassifiers(Lists.newArrayList((DocumentClassifier) new 
TwitterDocumentClassifier()))
+                        .withConverters(Lists.newArrayList((ActivityConverter) 
new TwitterFollowActivityConverter()));
+        ActivityConverterProcessor activity = new 
ActivityConverterProcessor(activityConverterProcessorConfiguration);
+
+        GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
+        GraphHttpPersistWriter graphPersistWriter = new 
GraphHttpPersistWriter(graphWriterConfiguration);
+
+        StreamBuilder builder = new LocalStreamBuilder();
+        builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, 
followingProvider);
+        builder.addStreamsProcessor("converter", converter, 1, 
TwitterFollowingProvider.STREAMS_ID);
+        builder.addStreamsProcessor("activity", activity, 1, "converter");
+        builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, 
"activity");
+
+        builder.start();
+    }
+
+    public static void main(String[] args) {
+
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
+
+        stream.run();
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
 
b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
new file mode 100644
index 0000000..ffbd39d
--- /dev/null
+++ 
b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0";
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", 
"required": true },
+    "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", 
"type": "object", "required": true }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot 
b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterFollowingProvider 
[label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java";];
+
+  //processors
+  TypeConverterProcessor 
[label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java";];
+  ActivityConverterProcessor 
[label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java";];
+
+  //persisters
+  GraphPersistWriter 
[label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java";];
+
+  //data
+  destination [label="http://{host}:{port}/db/data",shape=box];
+
+  //stream
+  TwitterFollowingProvider -> TypeConverterProcessor 
[label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java";];
+  TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+  ActivityConverterProcessor -> GraphPersistWriter 
[label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json";];
+  GraphPersistWriter -> destination
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md 
b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
new file mode 100644
index 0000000..936efb4
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
@@ -0,0 +1,33 @@
+### TwitterFollowNeo4j
+
+#### Description:
+
+Collects friend or follower connections for a set of twitter users to build a 
graph database in neo4j.
+
+#### Configuration:
+
+[TwitterFollowNeo4jIT.conf](TwitterFollowNeo4jIT.conf 
"TwitterFollowNeo4jIT.conf" )
+
+#### Run (SBT):
+
+    sbtx -210 -sbt-create
+    set resolvers += "Local Maven Repository" at 
"file://"+Path.userHome.absolutePath+"/.m2/repository"
+    set libraryDependencies += "org.apache.streams" % "twitter-follow-neo4j" % 
"0.4-incubating-SNAPSHOT"
+    set fork := true
+    set javaOptions +="-Dconfig.file=application.conf"
+    run org.apache.streams.example.graph.TwitterFollowNeo4j
+
+#### Run (Docker):
+
+    docker run apachestreams/twitter-follow-neo4j java -cp 
twitter-follow-neo4j-jar-with-dependencies.jar 
org.apache.streams.example.TwitterFollowNeo4j
+
+#### Specification:
+
+[TwitterFollowNeo4j.dot](TwitterFollowNeo4j.dot "TwitterFollowNeo4j.dot" )
+
+#### Diagram:
+
+![TwitterFollowNeo4j.dot.svg](./TwitterFollowNeo4j.dot.svg)
+
+
+###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/markdown/index.md 
b/local/twitter-follow-neo4j/src/site/markdown/index.md
new file mode 100644
index 0000000..3efdc5b
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/markdown/index.md
@@ -0,0 +1,42 @@
+### twitter-follow-neo4j
+
+#### Requirements:
+ - Authorized Twitter API credentials
+ - A running Neo4J 3.0.0+ instance
+
+#### Streams:
+
+<a href="TwitterFollowNeo4j.html" target="_self">TwitterFollowNeo4j</a>
+
+#### Build:
+
+    mvn clean package verify
+
+#### Test:
+
+Create a local file `application.conf` with valid twitter credentials
+
+    twitter {
+      oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+      }
+    }
+
+Start up neo4j with docker:
+
+    mvn -PdockerITs docker:start
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false 
-DargLine="-Dconfig.file=`pwd`/application.conf"
+
+Shutdown neo4j when finished:
+
+    mvn -PdockerITs docker:stop
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterFollowingProvider 
[label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java";];
+
+  //processors
+  TypeConverterProcessor 
[label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java";];
+  ActivityConverterProcessor 
[label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java";];
+
+  //persisters
+  GraphPersistWriter 
[label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java";];
+
+  //data
+  destination [label="http://{host}:{port}/db/data",shape=box];
+
+  //stream
+  TwitterFollowingProvider -> TypeConverterProcessor 
[label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java";];
+  TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+  ActivityConverterProcessor -> GraphPersistWriter 
[label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json";];
+  GraphPersistWriter -> destination
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
new file mode 100644
index 0000000..6025640
--- /dev/null
+++ 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0";
+  ],
+  "type": "object",
+  "javaType" : 
"org.apache.streams.example.graph.TwitterFollowNeo4jConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", 
"required": true },
+    "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", 
"type": "object", "required": true }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterFollowingProvider 
[label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java";];
+
+  //processors
+  TypeConverterProcessor 
[label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java";];
+  ActivityConverterProcessor 
[label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java";];
+
+  //persisters
+  GraphPersistWriter 
[label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java";];
+
+  //data
+  destination [label="http://{host}:{port}/db/data",shape=box];
+
+  //stream
+  TwitterFollowingProvider -> TypeConverterProcessor 
[label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java";];
+  TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+  ActivityConverterProcessor -> GraphPersistWriter 
[label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json";];
+  GraphPersistWriter -> destination
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
new file mode 100644
index 0000000..ffbd39d
--- /dev/null
+++ 
b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0";
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", 
"required": true },
+    "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", 
"type": "object", "required": true }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/site.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/site.xml 
b/local/twitter-follow-neo4j/src/site/site.xml
new file mode 100644
index 0000000..a25bae0
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/site.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project>
+    <custom>
+        <fluidoSkin>
+            <topBarEnabled>false</topBarEnabled>
+            <navBarStyle>navbar-inverse</navBarStyle>
+            <sideBarEnabled>true</sideBarEnabled>
+            <!--<gitHub>-->
+            <!--<projectId>apache/incubator-streams-examples</projectId>-->
+            <!--<ribbonOrientation>right</ribbonOrientation>-->
+            <!--<ribbonColor>black</ribbonColor>-->
+            <!--</gitHub>-->
+            <!--<twitter>-->
+            <!--<user>ApacheStreams</user>-->
+            <!--<showUser>true</showUser>-->
+            <!--<showFollowers>true</showFollowers>-->
+            <!--</twitter>-->
+        </fluidoSkin>
+    </custom>
+    <body>
+        <menu name="Configuration">
+            <item name="Neo4j" href="../../services/neo4j.html"/>
+        </menu>
+        <menu name="Credentials">
+            <item name="Twitter" href="../../credentials/twitter.html"/>
+        </menu>
+    </body>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
 
b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
new file mode 100644
index 0000000..2813b08
--- /dev/null
+++ 
b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.example.TwitterFollowNeo4j;
+import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter 
userstream in real-time
+ */
+public class TwitterFollowNeo4jIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
+
+    protected TwitterFollowNeo4jConfiguration testConfiguration;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/TwitterFollowGraphIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties graph_properties  = new Properties();
+        InputStream graph_stream  = new FileInputStream("neo4j.properties");
+        graph_properties.load(graph_stream);
+        Config graphProps  = ConfigFactory.parseProperties(graph_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(graphProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
+
+    }
+
+    @Test
+    public void testTwitterFollowGraph() throws Exception {
+
+        TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
+
+        stream.run();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf 
b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
new file mode 100644
index 0000000..d4b4aeb
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+twitter {
+  endpoint = "friends"
+  info = [
+    18055613
+  ]
+  twitter.max_items = 1000
+}
+graph {
+  hostname = ${neo4j.http.host}
+  port = ${neo4j.http.port}
+  type = "neo4j"
+  graph = "data"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
 
b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
new file mode 100644
index 0000000..7d87f36
--- /dev/null
+++ 
b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieves as many posts from a known list of users as twitter API allows.
+ *
+ * Converts them to activities, and writes them in activity format to 
Elasticsearch.
+ */
+
+public class TwitterHistoryElasticsearch implements Runnable {
+
+    public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    TwitterHistoryElasticsearchConfiguration config;
+
+    public TwitterHistoryElasticsearch() {
+        this(new 
ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public 
TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
+        this.config = config;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        TwitterHistoryElasticsearch history = new 
TwitterHistoryElasticsearch();
+
+        new Thread(history).start();
+
+    }
+
+
+    public void run() {
+
+        TwitterTimelineProvider provider = new 
TwitterTimelineProvider(config.getTwitter());
+        ActivityConverterProcessor converter = new 
ActivityConverterProcessor();
+        ElasticsearchPersistWriter writer = new 
ElasticsearchPersistWriter(config.getElasticsearch());
+
+        StreamBuilder builder = new LocalStreamBuilder(500);
+
+        builder.newPerpetualStream("provider", provider);
+        builder.addStreamsProcessor("converter", converter, 2, "provider");
+        builder.addStreamsPersistWriter("writer", writer, 1, "converter");
+        builder.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
 
b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
deleted file mode 100644
index 090b9ed..0000000
--- 
a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.twitter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
-import org.apache.streams.twitter.provider.TwitterConfigurator;
-import org.apache.streams.twitter.provider.TwitterTimelineProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Retrieves as many posts from a known list of users as twitter API allows.
- *
- * Converts them to activities, and writes them in activity format to 
Elasticsearch.
- */
-
-public class TwitterHistoryElasticsearch implements Runnable {
-
-    public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    TwitterHistoryElasticsearchConfiguration config;
-
-    public TwitterHistoryElasticsearch() {
-        this(new 
ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public 
TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
-        this.config = config;
-    }
-
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
-
-        TwitterHistoryElasticsearch history = new 
TwitterHistoryElasticsearch();
-
-        new Thread(history).start();
-
-    }
-
-
-    public void run() {
-
-        TwitterTimelineProvider provider = new 
TwitterTimelineProvider(config.getTwitter());
-        ActivityConverterProcessor converter = new 
ActivityConverterProcessor();
-        ElasticsearchPersistWriter writer = new 
ElasticsearchPersistWriter(config.getElasticsearch());
-
-        StreamBuilder builder = new LocalStreamBuilder(500);
-
-        builder.newPerpetualStream("provider", provider);
-        builder.addStreamsProcessor("converter", converter, 2, "provider");
-        builder.addStreamsPersistWriter("writer", writer, 1, "converter");
-        builder.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
 
b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
index ea9b165..eaf8028 100644
--- 
a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
+++ 
b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
         "http://www.apache.org/licenses/LICENSE-2.0";
     ],
     "type": "object",
-    "javaType" : 
"org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration",
+    "javaType" : 
"org.apache.streams.example.TwitterHistoryElasticsearchConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterUserInformationConfiguration", "type": 
"object", "required": true },

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
 
b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
new file mode 100644
index 0000000..b0c9155
--- /dev/null
+++ 
b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.TwitterHistoryElasticsearch;
+import org.apache.streams.example.TwitterHistoryElasticsearchConfiguration;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter 
userstream in real-time
+ */
+public class TwitterHistoryElasticsearchIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
+
+    protected TwitterHistoryElasticsearchConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
+        testClient = new 
ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertFalse(indicesExistsResponse.isExists());
+
+    }
+
+    @Test
+    public void testTwitterHistoryElasticsearch() throws Exception {
+
+        TwitterHistoryElasticsearch stream = new 
TwitterHistoryElasticsearch(testConfiguration);
+
+        stream.run();
+
+        // assert lines in file
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+                .setTypes(testConfiguration.getElasticsearch().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
 
b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
deleted file mode 100644
index b8e1b64..0000000
--- 
a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.example;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.example.twitter.TwitterHistoryElasticsearch;
-import 
org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Example stream that populates elasticsearch with activities from twitter 
userstream in real-time
- */
-public class TwitterHistoryElasticsearchIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
-
-    protected TwitterHistoryElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new 
ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = new 
ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertFalse(indicesExistsResponse.isExists());
-
-    }
-
-    @Test
-    public void testTwitterHistoryElasticsearch() throws Exception {
-
-        TwitterHistoryElasticsearch stream = new 
TwitterHistoryElasticsearch(testConfiguration);
-
-        stream.run();
-
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        assertNotEquals(count, 0);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
 
b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
new file mode 100644
index 0000000..f1e776a
--- /dev/null
+++ 
b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.filters.VerbDefinitionDropFilter;
+import org.apache.streams.filters.VerbDefinitionKeepFilter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.apache.streams.verbs.ObjectCombination;
+import org.apache.streams.verbs.VerbDefinition;
+import org.elasticsearch.common.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter 
userstream in real-time
+ */
+public class TwitterUserstreamElasticsearch implements Runnable {
+
+    public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+
+    /* this pattern will match any/only deletes */
+    private static VerbDefinition deleteVerbDefinition =
+            new VerbDefinition()
+            .withValue("delete")
+            .withObjects(Lists.newArrayList(new ObjectCombination()));
+
+    TwitterUserstreamElasticsearchConfiguration config;
+
+    public TwitterUserstreamElasticsearch() {
+        this(new 
ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public 
TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration 
config) {
+        this.config = config;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        TwitterUserstreamElasticsearch userstream = new 
TwitterUserstreamElasticsearch();
+        new Thread(userstream).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        TwitterStreamConfiguration twitterStreamConfiguration = 
config.getTwitter();
+        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = 
config.getElasticsearch();
+
+        TwitterStreamProvider stream = new 
TwitterStreamProvider(twitterStreamConfiguration);
+        ActivityConverterProcessor converter = new 
ActivityConverterProcessor();
+        VerbDefinitionDropFilter noDeletesProcessor = new 
VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
+        ElasticsearchPersistWriter writer = new 
ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+        VerbDefinitionKeepFilter deleteOnlyProcessor = new 
VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+        SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
+        ElasticsearchPersistDeleter deleter = new 
ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
+
+        builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
+        builder.addStreamsProcessor("converter", converter, 2, 
TwitterStreamProvider.STREAMS_ID);
+        builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 
1, "converter");
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, 
writer, 1, "NoDeletesProcessor");
+        builder.addStreamsProcessor("DeleteOnlyProcessor", 
deleteOnlyProcessor, 1, "converter");
+        builder.addStreamsProcessor("SetDeleteIdProcessor", 
setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
+        builder.addStreamsPersistWriter("deleter", deleter, 1, 
"SetDeleteIdProcessor");
+
+        builder.start();
+
+    }
+
+    protected class SetDeleteIdProcessor implements StreamsProcessor {
+
+        public String getId() {
+            return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
+        }
+
+        @Override
+        public List<StreamsDatum> process(StreamsDatum entry) {
+
+            Preconditions.checkArgument(entry.getDocument() instanceof 
Activity);
+            String id = entry.getId();
+            // replace delete with post in id
+            // ensure ElasticsearchPersistDeleter will remove original post if 
present
+            id = Strings.replace(id, "delete", "post");
+            entry.setId(id);
+
+            return Lists.newArrayList(entry);
+        }
+
+        @Override
+        public void prepare(Object configurationObject) {
+
+
+        }
+
+        @Override
+        public void cleanUp() {
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
 
b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
deleted file mode 100644
index c483742..0000000
--- 
a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.example;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import 
org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.filters.VerbDefinitionDropFilter;
-import org.apache.streams.filters.VerbDefinitionKeepFilter;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.provider.TwitterStreamProvider;
-import org.apache.streams.verbs.ObjectCombination;
-import org.apache.streams.verbs.VerbDefinition;
-import org.elasticsearch.common.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Example stream that populates elasticsearch with activities from twitter 
userstream in real-time
- */
-public class TwitterUserstreamElasticsearch implements Runnable {
-
-    public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
-
-    /* this pattern will match any/only deletes */
-    private static VerbDefinition deleteVerbDefinition =
-            new VerbDefinition()
-            .withValue("delete")
-            .withObjects(Lists.newArrayList(new ObjectCombination()));
-
-    TwitterUserstreamElasticsearchConfiguration config;
-
-    public TwitterUserstreamElasticsearch() {
-        this(new 
ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public 
TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration 
config) {
-        this.config = config;
-    }
-
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
-
-        TwitterUserstreamElasticsearch userstream = new 
TwitterUserstreamElasticsearch();
-        new Thread(userstream).start();
-
-    }
-
-    @Override
-    public void run() {
-
-        TwitterStreamConfiguration twitterStreamConfiguration = 
config.getTwitter();
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = 
config.getElasticsearch();
-
-        TwitterStreamProvider stream = new 
TwitterStreamProvider(twitterStreamConfiguration);
-        ActivityConverterProcessor converter = new 
ActivityConverterProcessor();
-        VerbDefinitionDropFilter noDeletesProcessor = new 
VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
-        ElasticsearchPersistWriter writer = new 
ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
-        VerbDefinitionKeepFilter deleteOnlyProcessor = new 
VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
-        SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
-        ElasticsearchPersistDeleter deleter = new 
ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
-
-        builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
-        builder.addStreamsProcessor("converter", converter, 2, 
TwitterStreamProvider.STREAMS_ID);
-        builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 
1, "converter");
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, 
writer, 1, "NoDeletesProcessor");
-        builder.addStreamsProcessor("DeleteOnlyProcessor", 
deleteOnlyProcessor, 1, "converter");
-        builder.addStreamsProcessor("SetDeleteIdProcessor", 
setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
-        builder.addStreamsPersistWriter("deleter", deleter, 1, 
"SetDeleteIdProcessor");
-
-        builder.start();
-
-    }
-
-    protected class SetDeleteIdProcessor implements StreamsProcessor {
-
-        public String getId() {
-            return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
-        }
-
-        @Override
-        public List<StreamsDatum> process(StreamsDatum entry) {
-
-            Preconditions.checkArgument(entry.getDocument() instanceof 
Activity);
-            String id = entry.getId();
-            // replace delete with post in id
-            // ensure ElasticsearchPersistDeleter will remove original post if 
present
-            id = Strings.replace(id, "delete", "post");
-            entry.setId(id);
-
-            return Lists.newArrayList(entry);
-        }
-
-        @Override
-        public void prepare(Object configurationObject) {
-
-
-        }
-
-        @Override
-        public void cleanUp() {
-
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
 
b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
index 6a25850..7261439 100644
--- 
a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
+++ 
b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
     "http://www.apache.org/licenses/LICENSE-2.0";
   ],
   "type": "object",
-  "javaType" : 
"org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
+  "javaType" : 
"org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration",
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
     "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", 
"required": true },

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
 
b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
new file mode 100644
index 0000000..7ba9940
--- /dev/null
+++ 
b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.example.TwitterUserstreamElasticsearch;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+public class TwitterUserstreamElasticsearchIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+
+    protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new 
File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new 
ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+        testClient = new 
ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertFalse(indicesExistsResponse.isExists());
+
+    }
+
+    @Test
+    public void testReindex() throws Exception {
+
+        TwitterUserstreamElasticsearch stream = new 
TwitterUserstreamElasticsearch(testConfiguration);
+
+        stream.run();
+
+        // assert lines in file
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+                .setTypes(testConfiguration.getElasticsearch().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
 
b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
deleted file mode 100644
index 2f524f0..0000000
--- 
a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import 
org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.example.TwitterUserstreamElasticsearch;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between two indexes on same cluster
- */
-public class TwitterUserstreamElasticsearchIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
-
-    protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new 
FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = 
testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = 
StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new 
ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = new 
ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertFalse(indicesExistsResponse.isExists());
-
-    }
-
-    @Test
-    public void testReindex() throws Exception {
-
-        TwitterUserstreamElasticsearch stream = new 
TwitterUserstreamElasticsearch(testConfiguration);
-
-        stream.run();
-
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        assertNotEquals(count, 0);
-    }
-}

Reply via email to