Repository: storm
Updated Branches:
  refs/heads/master 4966d7a69 -> 3f84dcca0


STORM-2658: Extract storm-kafka-client examples to storm-kafka-client-examples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b84248d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b84248d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b84248d

Branch: refs/heads/master
Commit: 9b84248d4c496340f60253bbcabae1b097d4dd23
Parents: 3f386c9
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Tue Jul 25 17:33:37 2017 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue Aug 1 15:37:54 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  49 +-------
 .../storm-kafka-client-examples/README.markdown |  10 ++
 examples/storm-kafka-client-examples/pom.xml    |   8 +-
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |  49 ++++++++
 .../test/KafkaSpoutTopologyMainNamedTopics.java | 106 +++++++++++++++++
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  61 ++++++++++
 .../TridentKafkaClientWordCountNamedTopics.java |  74 ++++++------
 ...identKafkaClientWordCountWildcardTopics.java |   5 +-
 .../KafkaSpoutTopologyMainNamedTopicsLocal.java |  66 +++++++++++
 ...fkaSpoutTopologyMainWildcardTopicsLocal.java |  29 +++++
 external/storm-kafka-client/pom.xml             |   3 -
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   7 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   7 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   9 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   7 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   7 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   7 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |   1 -
 .../SingleTopicKafkaSpoutConfiguration.java     |  84 --------------
 .../SingleTopicKafkaSpoutConfiguration.java     |  66 +++++++++++
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |  49 --------
 .../test/KafkaSpoutTopologyMainNamedTopics.java | 115 -------------------
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  58 ----------
 23 files changed, 460 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 99b9ae5..a1814b4 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -217,6 +217,9 @@ final Stream spoutStream = 
tridentTopology.newStream("kafkaSpout",
 Trident does not support multiple streams and will ignore any streams set for 
output.  If however the Fields are not identical for each
 output topic it will throw an exception and not continue.
 
+#### Example topologies
+Example topologies using storm-kafka-client can be found in the 
examples/storm-kafka-client-examples directory included in the Storm source or 
binary distributions.
+
 ### Custom RecordTranslators (ADVANCED)
 
 In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator 
should cover your use case.  If you do run into a situation where you need a 
custom one
@@ -244,52 +247,6 @@ otherwise trident can throw exceptions.
 
 By default the KafkaSpout instances will be assigned partitions using a round 
robin strategy. If you need to customize partition assignment, you must 
implement the `ManualPartitioner` interface. The implementation can be passed 
to the `ManualPartitionSubscription` constructor, and the `Subscription` can 
then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` 
constructor. Please take care when supplying a custom implementation, since an 
incorrect `ManualPartitioner` implementation could leave some partitions 
unread, or concurrently read by multiple spout instances. See the 
`RoundRobinManualPartitioner` for an example of how to implement this 
functionality.
 
-## Use the Maven Shade Plugin to Build the Uber Jar
-
-Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-
-```xml
-<plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-    <artifactId>maven-shade-plugin</artifactId>
-    <version>2.4.1</version>
-    <executions>
-        <execution>
-            <phase>package</phase>
-            <goals>
-                <goal>shade</goal>
-            </goals>
-            <configuration>
-                <transformers>
-                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                        
<mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
-                    </transformer>
-                </transformers>
-            </configuration>
-        </execution>
-    </executions>
-</plugin>
-```
-
-create the uber jar by running the command:
-
-`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-
-This will create the uber jar file with the name and location matching the 
following pattern:
- 
-`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
-
-### Run Storm Topology
-
-Copy the file 
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jar` 
to `STORM_HOME/extlib`
-
-Using the Kafka command line tools create three topics [test, test1, test2] 
and use the Kafka console producer to populate the topics with some data 
-
-Execute the command `STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
-
-With the debug level logs enabled it is possible to see the messages of each 
topic being redirected to the appropriate Bolt as defined 
-by the streams defined and choice of shuffle grouping.   
-
 ## Using storm-kafka-client with different versions of kafka
 
 Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, 
meaning it will not be pulled in

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/README.markdown 
b/examples/storm-kafka-client-examples/README.markdown
new file mode 100644
index 0000000..74c1366
--- /dev/null
+++ b/examples/storm-kafka-client-examples/README.markdown
@@ -0,0 +1,10 @@
+## Usage
+This module contains example topologies demonstrating storm-kafka-client spout 
and Trident usage.
+
+The module is built by running `mvn clean package -Dprovided.scope=compile`. 
This will generate the `target/storm-kafka-client-examples-VERSION.jar` file. 
The jar contains all dependencies and can be submitted to Storm via the Storm 
CLI, e.g.
+```
+storm jar storm-kafka-client-examples-2.0.0-SNAPSHOT.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics
+```
+will submit the topologies set up by KafkaSpoutTopologyMainNamedTopics to 
Storm.
+
+Note that this example produces a jar containing all dependencies for ease of 
use. In a production environment you may want to reduce the jar size by 
extracting some dependencies (e.g. org.apache.kafka:kafka-clients) from the 
jar. You can do this by setting the dependencies you don't want to include in 
the jars to `provided` scope, and then using the --artifacts flag for the storm 
jar command to fetch the dependencies when submitting the topology. See the 
[CLI 
documentation](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Command-line-client.html)
 for syntax.

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml 
b/examples/storm-kafka-client-examples/pom.xml
index 9c5796d..382942f 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -87,6 +87,12 @@
             <version>${storm.kafka.client.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -136,7 +142,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2</maxAllowedViolations>
+                    <maxAllowedViolations>0</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
new file mode 100644
index 0000000..8ace45c
--- /dev/null
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaSpoutTestBolt extends BaseRichBolt {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.debug("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
new file mode 100644
index 0000000..0731069
--- /dev/null
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.trident.KafkaProducerTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class KafkaSpoutTopologyMainNamedTopics {
+
+    private static final String TOPIC_2_STREAM = "test_2_stream";
+    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
+    private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
+    public static final String TOPIC_0 = "kafka-spout-test";
+    public static final String TOPIC_1 = "kafka-spout-test-1";
+    public static final String TOPIC_2 = "kafka-spout-test-2";
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainNamedTopics().runMain(args);
+    }
+
+    protected void runMain(String[] args) throws Exception {
+        final String brokerUrl = args.length > 0 ? args[0] : 
KAFKA_LOCAL_BROKER;
+        System.out.println("Running with broker url: " + brokerUrl);
+
+        Config tpConf = getConfig();
+
+        // Producers. This is just to get some data in Kafka, normally you 
would be getting this data from elsewhere
+        StormSubmitter.submitTopology(TOPIC_0 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0));
+        StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
+        StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
+
+        //Consumer. Sets up a topology that reads the given Kafka spouts and 
logs the received messages
+        StormSubmitter.submitTopology("storm-kafka-client-spout-test", tpConf, 
getTopologyKafkaSpout(getKafkaSpoutConfig(brokerUrl)));
+    }
+
+    protected Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    protected StormTopology getTopologyKafkaSpout(KafkaSpoutConfig<String, 
String> spoutConfig) {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new KafkaSpout<>(spoutConfig), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
+            .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)
+            .shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
+        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
+        return tp.createTopology();
+    }
+
+    protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String 
bootstrapServers) {
+        ByTopicRecordTranslator<String, String> trans = new 
ByTopicRecordTranslator<>(
+            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), 
r.value()),
+            new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_0_1_STREAM);
+        trans.forTopic(TOPIC_2,
+            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), 
r.value()),
+            new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_2_STREAM);
+        return KafkaSpoutConfig.builder(bootstrapServers, new 
String[]{TOPIC_0, TOPIC_1, TOPIC_2})
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setRetry(getRetryService())
+            .setRecordTranslator(trans)
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .build();
+    }
+
+    protected KafkaSpoutRetryService getRetryService() {
+        return new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
+            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
new file mode 100644
index 0000000..2528c00
--- /dev/null
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMainNamedTopics {
+
+    private static final String STREAM = "test_wildcard_stream";
+    private static final Pattern TOPIC_WILDCARD_PATTERN = 
Pattern.compile("kafka-spout-test-[1|2]");
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
+    }
+
+    @Override
+    protected StormTopology getTopologyKafkaSpout(KafkaSpoutConfig<String, 
String> spoutConfig) {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new KafkaSpout<>(spoutConfig), 1);
+        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
+        return tp.createTopology();
+    }
+
+    @Override
+    protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String 
bootstrapServers) {
+        return KafkaSpoutConfig.builder(bootstrapServers, 
TOPIC_WILDCARD_PATTERN)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setRetry(getRetryService())
+            .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), 
r.offset(), r.key(), r.value()),
+                new Fields("topic", "partition", "offset", "key", "value"), 
STREAM)
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 886d15d..ecc52b4 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -42,72 +42,68 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
 public class TridentKafkaClientWordCountNamedTopics {
+
     private static final String TOPIC_1 = "test-trident";
     private static final String TOPIC_2 = "test-trident-1";
     private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
 
-    private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque() {
-        return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
+    private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
+        return new KafkaTridentSpoutOpaque<>(spoutConfig);
     }
 
     private static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_VALUE_FUNC = new JustValueFunc();
 
     /**
-     * Needs to be serializable
+     * Needs to be serializable.
      */
     private static class JustValueFunc implements Func<ConsumerRecord<String, 
String>, List<Object>>, Serializable {
+
         @Override
         public List<Object> apply(ConsumerRecord<String, String> record) {
             return new Values(record.value());
         }
     }
 
-    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
-        return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, 
"kafkaSpoutTestGroup_" + System.nanoTime())
-                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
-                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
-                .setRetry(newRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .build();
+    protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String 
bootstrapServers) {
+        return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + 
System.nanoTime())
+            .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
+            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
+            .setRetry(newRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .build();
     }
 
     protected KafkaSpoutRetryService newRetryService() {
         return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, 
TimeUnit.MICROSECONDS),
-                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
+            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
     }
 
     public static void main(String[] args) throws Exception {
         new TridentKafkaClientWordCountNamedTopics().run(args);
     }
 
-    protected void run(String[] args) throws AlreadyAliveException, 
InvalidTopologyException, AuthorizationException, InterruptedException {
-        if (args.length > 0 && Arrays.stream(args).anyMatch(option -> 
option.equals("-h"))) {
-            System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", 
getClass().getName(),
-                    "broker_host:broker_port", "topic1", "topic2", 
"topology_name");
-        } else {
-            final String brokerUrl = args.length > 0 ? args[0] : 
KAFKA_LOCAL_BROKER;
-            final String topic1 = args.length > 1 ? args[1] : TOPIC_1;
-            final String topic2 = args.length > 2 ? args[2] : TOPIC_2;
-
-            System.out.printf("Running with broker_url: [%s], topics: [%s, 
%s]\n", brokerUrl, topic1, topic2);
-
-            Config tpConf = new Config();
-            tpConf.setDebug(true);
-            tpConf.setMaxSpoutPending(5);
-
-            // Producers
-            StormSubmitter.submitTopology(topic1 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, topic1));
-            StormSubmitter.submitTopology(topic2 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, topic2));
-            // Consumer
-            StormSubmitter.submitTopology("topics-consumer", tpConf, 
TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
-
-            // Print results to console, which also causes the print filter in 
the consumer topology to print the results in the worker log
-            Thread.sleep(2000);
-            DrpcResultsPrinter.remoteClient().printResults(60, 1, 
TimeUnit.SECONDS);
-        }
+    protected void run(String[] args) throws AlreadyAliveException, 
InvalidTopologyException,
+        AuthorizationException, InterruptedException {
+        final String brokerUrl = args.length > 0 ? args[0] : 
KAFKA_LOCAL_BROKER;
+        System.out.println("Running with broker url " + brokerUrl);
+
+        Config tpConf = new Config();
+        tpConf.setDebug(true);
+        tpConf.setMaxSpoutPending(5);
+
+        // Producers
+        StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
+        StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
+        // Consumer
+        StormSubmitter.submitTopology("topics-consumer", tpConf,
+            
TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque(newKafkaSpoutConfig(brokerUrl))));
+
+        // Print results to console, which also causes the print filter in the 
consumer topology to print the results in the worker log
+        Thread.sleep(2000);
+        DrpcResultsPrinter.remoteClient().printResults(60, 1, 
TimeUnit.SECONDS);
         System.exit(0);     // Kill all the non daemon threads
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
index 0be3127..865ad82 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
@@ -30,8 +30,9 @@ import org.apache.storm.tuple.Values;
 public class TridentKafkaClientWordCountWildcardTopics extends 
TridentKafkaClientWordCountNamedTopics {
     private static final Pattern TOPIC_WILDCARD_PATTERN = 
Pattern.compile("test-trident(-1)?");
 
-    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
-        return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
+    @Override
+    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String 
bootstrapServers) {
+        return KafkaSpoutConfig.builder(bootstrapServers, 
TOPIC_WILDCARD_PATTERN)
                 .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                 .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                 .setRecordTranslator((r) -> new Values(r.value()), new 
Fields("str"))

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java
 
b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java
new file mode 100644
index 0000000..05a0dd4
--- /dev/null
+++ 
b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import static 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_0;
+import static 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_1;
+import static 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_2;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.kafka.trident.KafkaProducerTopology;
+
+public class KafkaSpoutTopologyMainNamedTopicsLocal {
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainNamedTopicsLocal().runExample();
+    }
+    
+    protected void runExample() throws Exception {
+        String brokerUrl = "localhost:9092";
+        KafkaSpoutTopologyMainNamedTopics example = getTopology();
+        Config tpConf = example.getConfig();
+        
+        LocalCluster localCluster = new LocalCluster();
+        // Producers. This is just to get some data in Kafka, normally you 
would be getting this data from elsewhere
+        localCluster.submitTopology(TOPIC_0 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0));
+        localCluster.submitTopology(TOPIC_1 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
+        localCluster.submitTopology(TOPIC_2 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
+
+        //Consumer. Sets up a topology that reads the given Kafka spouts and 
logs the received messages
+        localCluster.submitTopology("storm-kafka-client-spout-test", tpConf, 
example.getTopologyKafkaSpout(example.getKafkaSpoutConfig(brokerUrl)));
+        
+        stopWaitingForInput();
+    }
+    
+    protected KafkaSpoutTopologyMainNamedTopics getTopology() {
+        return new KafkaSpoutTopologyMainNamedTopics();
+    }
+    
+    protected void stopWaitingForInput() {
+        try {
+            System.out.println("PRESS ENTER TO STOP");
+            new BufferedReader(new InputStreamReader(System.in)).readLine();
+            System.exit(0);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java
 
b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java
new file mode 100644
index 0000000..336beba
--- /dev/null
+++ 
b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+public class KafkaSpoutTopologyMainWildcardTopicsLocal extends 
KafkaSpoutTopologyMainNamedTopicsLocal {
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainWildcardTopicsLocal().runExample();
+    } 
+    
+    protected KafkaSpoutTopologyMainNamedTopics getTopology() {
+        return new KafkaSpoutTopologyMainWildcardTopics();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 6db04a0..2f17ea1 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -80,18 +80,15 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
-            <version>${mockito.version}</version>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-core</artifactId>
-            <version>1.3</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-library</artifactId>
-            <version>1.3</version>
             <scope>test</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 7258fe2..bdae043 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
@@ -29,7 +28,7 @@ import java.util.Map;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -41,6 +40,8 @@ import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutCommitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -57,7 +58,7 @@ public class KafkaSpoutCommitTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 8e6d390..756ea1a 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.inOrder;
@@ -35,7 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -45,6 +44,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutEmitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -57,7 +58,7 @@ public class KafkaSpoutEmitTest {
 
     @Before
     public void setUp() {
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 23630a6..635e2d2 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
@@ -41,7 +40,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -54,6 +53,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutRebalanceTest {
 
     @Captor
@@ -126,7 +127,7 @@ public class KafkaSpoutRebalanceTest {
             doNothing()
                 .when(subscriptionMock)
                 .subscribe(any(), rebalanceListenerCapture.capture(), any());
-            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            KafkaSpout<String, String> spout = new 
KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactory);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -163,7 +164,7 @@ public class KafkaSpoutRebalanceTest {
                 .when(subscriptionMock)
                 .subscribe(any(), rebalanceListenerCapture.capture(), any());
         KafkaSpoutRetryService retryServiceMock = 
mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        KafkaSpout<String, String> spout = new 
KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactory);

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 078f7a1..ec557e7 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
@@ -33,7 +32,7 @@ import java.util.Map;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -45,6 +44,8 @@ import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutRetryLimitTest {
     
     private final long offsetCommitPeriodMs = 2_000;
@@ -65,7 +66,7 @@ public class KafkaSpoutRetryLimitTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .setRetry(ZERO_RETRIES_RETRY_SERVICE)
             .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 261c654..c361e24 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.everyItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -38,7 +37,7 @@ import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.kafka.KafkaUnitRule;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -48,6 +47,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class MaxUncommittedOffsetTest {
 
     @Rule
@@ -61,7 +62,7 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
         .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 6b92de8..35f95cd 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Matchers.any;
@@ -45,7 +44,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.KafkaUnitRule;
-import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -60,6 +59,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class SingleTopicKafkaSpoutTest {
 
     @Rule
@@ -80,7 +81,7 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index 5f931bb..aa65d0f 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -17,7 +17,6 @@
 package org.apache.storm.kafka.spout;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
deleted file mode 100644
index d2f38b0..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.builders;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.storm.Config;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.subscription.Subscription;
-import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class SingleTopicKafkaSpoutConfiguration {
-
-    public static final String STREAM = "test_stream";
-    public static final String TOPIC = "test";
-
-    /**
-     * Retry in a tight loop (keep unit tests fasts).
-     */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
-        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    public static Config getConfig() {
-        Config config = new Config();
-        config.setDebug(true);
-        return config;
-    }
-
-    public static StormTopology getTopologyKafkaSpout(int port) {
-        final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
-        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
-        return tp.createTopology();
-    }
-
-    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(int port) {
-        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
-    }
-
-    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
-        return setCommonSpoutConfig(new 
KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
-    }
-
-    private static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
-        return config.setRecordTranslator((r) -> new Values(r.topic(), 
r.key(), r.value()),
-            new Fields("topic", "key", "value"), STREAM)
-            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-            .setRetry(getRetryService())
-            .setOffsetCommitPeriodMs(10_000)
-            .setFirstPollOffsetStrategy(EARLIEST)
-            .setMaxUncommittedOffsets(250)
-            .setPollTimeoutMs(1000);
-    }
-
-    protected static KafkaSpoutRetryService getRetryService() {
-        return UNIT_TEST_RETRY_SERVICE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
new file mode 100644
index 0000000..2bf1f36
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.config.builder;
+
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class SingleTopicKafkaSpoutConfiguration {
+
+    public static final String STREAM = "test_stream";
+    public static final String TOPIC = "test";
+
+    /**
+     * Retry in a tight loop (keep unit tests fasts).
+     */
+    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
+    }
+
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new 
KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
+    }
+
+    private static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config.setRecordTranslator((r) -> new Values(r.topic(), 
r.key(), r.value()),
+            new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000);
+    }
+
+    protected static KafkaSpoutRetryService getRetryService() {
+        return UNIT_TEST_RETRY_SERVICE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
deleted file mode 100644
index 8ace45c..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import java.util.Map;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSpoutTestBolt extends BaseRichBolt {
-    protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.debug("input = [" + input + "]");
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
deleted file mode 100644
index 50991e8..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
-import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class KafkaSpoutTopologyMainNamedTopics {
-    private static final String TOPIC_2_STREAM = "test_2_stream";
-    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
-    private static final String[] TOPICS = new 
String[]{"test","test1","test2"};
-
-    public static void main(String[] args) throws Exception {
-        new KafkaSpoutTopologyMainNamedTopics().runMain(args);
-    }
-
-    protected void runMain(String[] args) throws Exception {
-        if (args.length == 0) {
-            submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
-        } else {
-            submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), 
getConfig());
-        }
-    }
-
-    protected void submitTopologyLocalCluster(StormTopology topology, Config 
config) throws Exception {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("test", config, topology);
-        stopWaitingForInput();
-    }
-
-    protected void submitTopologyRemoteCluster(String arg, StormTopology 
topology, Config config) throws Exception {
-        StormSubmitter.submitTopology(arg, config, topology);
-    }
-
-    protected void stopWaitingForInput() {
-        try {
-            System.out.println("PRESS ENTER TO STOP");
-            new BufferedReader(new InputStreamReader(System.in)).readLine();
-            System.exit(0);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    protected Config getConfig() {
-        Config config = new Config();
-        config.setDebug(true);
-        return config;
-    }
-
-    protected StormTopology getTopologyKafkaSpout() {
-        final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
-        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
-          .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)
-          .shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
-        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
-        return tp.createTopology();
-    }
-
-    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
-        ByTopicRecordTranslator<String, String> trans = new 
ByTopicRecordTranslator<>(
-                (r) -> new Values(r.topic(), r.partition(), r.offset(), 
r.key(), r.value()),
-                new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_0_1_STREAM);
-        trans.forTopic(TOPICS[2], 
-                (r) -> new Values(r.topic(), r.partition(), r.offset(), 
r.key(), r.value()),
-                new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_2_STREAM);
-        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-                .setRetry(getRetryService())
-                .setRecordTranslator(trans)
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .build();
-    }
-
-    protected KafkaSpoutRetryService getRetryService() {
-            return new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
-                    TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
deleted file mode 100644
index d50e2ab..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMainNamedTopics {
-    private static final String STREAM = "test_wildcard_stream";
-    private static final Pattern TOPIC_WILDCARD_PATTERN = 
Pattern.compile("test[1|2]");
-
-    public static void main(String[] args) throws Exception {
-        new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
-    }
-
-    protected StormTopology getTopologyKafkaSpout() {
-        final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
-        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
-        return tp.createTopology();
-    }
-
-    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
-        return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-                .setRetry(getRetryService())
-                .setRecordTranslator((r) -> new Values(r.topic(), 
r.partition(), r.offset(), r.key(), r.value()),
-                        new Fields("topic", "partition", "offset", "key", 
"value"), STREAM)
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .build();
-    }
-}

Reply via email to