http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
deleted file mode 100644
index c230f09..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Represents the {@link KafkaSpoutStream} associated with each topic, and 
provides a public API to
- * declare output streams and emmit tuples, on the appropriate stream, for all 
the topics specified.
- */
-public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class);
-
-    private final Map<String, KafkaSpoutStream> topicToStream;
-
-    private KafkaSpoutStreamsNamedTopics(Builder builder) {
-        this.topicToStream = builder.topicToStream;
-        LOG.debug("Built {}", this);
-    }
-
-    /**
-     * @param topic the topic for which to get output fields
-     * @return the declared output fields
-     */
-    public Fields getOutputFields(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            final Fields outputFields = 
topicToStream.get(topic).getOutputFields();
-            LOG.trace("Topic [{}] has output fields [{}]", topic, 
outputFields);
-            return outputFields;
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
-    }
-
-    /**
-     * @param topic the topic to for which to get the stream id
-     * @return the id of the stream to where the tuples are emitted
-     */
-    public KafkaSpoutStream getStream(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            return topicToStream.get(topic);
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
-    }
-
-    /**
-     * @return list of topics subscribed and emitting tuples to a stream as 
configured by {@link KafkaSpoutStream}
-     */
-    public List<String> getTopics() {
-        return new ArrayList<>(topicToStream.keySet());
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (KafkaSpoutStream stream : topicToStream.values()) {
-            if 
(!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId()))
 {
-                stream.declareOutputFields(declarer);
-            }
-        }
-    }
-
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
-        getStream(messageId.topic()).emit(collector, tuple, messageId);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStreamsNamedTopics{" +
-                "topicToStream=" + topicToStream +
-                '}';
-    }
-
-    public static class Builder {
-        private final Map<String, KafkaSpoutStream> topicToStream = new 
HashMap<>();;
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with the given output Fields for 
each topic specified.
-         * All topics will have the default stream id and the same output 
fields.
-         */
-        public Builder(Fields outputFields, String... topics) {
-            addStream(outputFields, topics);
-        }
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with this particular stream for 
each topic specified.
-         * All the topics will have the specified stream id and the same 
output fields.
-         */
-        public Builder (Fields outputFields, String streamId, String... 
topics) {
-            addStream(outputFields, streamId, topics);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated 
with each topic
-         */
-        public Builder(KafkaSpoutStream stream) {
-            addStream(stream);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated 
with each topic
-         */
-        public Builder addStream(KafkaSpoutStream stream) {
-            topicToStream.put(stream.getTopic(), stream);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String...)}
-         */
-        public Builder addStream(Fields outputFields, String... topics) {
-            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String, 
String...)}
-         */
-        public Builder addStream(Fields outputFields, String streamId, 
String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, 
streamId, topic));
-            }
-            return this;
-        }
-
-        public KafkaSpoutStreamsNamedTopics build() {
-            return new KafkaSpoutStreamsNamedTopics(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
deleted file mode 100644
index 5c3bd47..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-import java.util.List;
-import java.util.regex.Pattern;
-
-public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams {
-    private KafkaSpoutStream kafkaSpoutStream;
-
-    public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) {
-        this.kafkaSpoutStream = kafkaSpoutStream;
-        if (kafkaSpoutStream.getTopicWildcardPattern() == null) {
-            throw new IllegalStateException("KafkaSpoutStream must be 
configured for wildcard topic");
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        kafkaSpoutStream.declareOutputFields(declarer);
-    }
-
-    @Override
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
-        kafkaSpoutStream.emit(collector, tuple, messageId);
-    }
-
-    public KafkaSpoutStream getStream() {
-        return kafkaSpoutStream;
-    }
-
-    public Pattern getTopicWildcardPattern() {
-        return kafkaSpoutStream.getTopicWildcardPattern();
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStreamsWildcardTopics{" +
-                "kafkaSpoutStream=" + kafkaSpoutStream +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
deleted file mode 100644
index 3bb71a8..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to 
build tuples from {@link ConsumerRecord}s.
- * Users must subclass this abstract class to provide their implementation. 
See also {@link KafkaSpoutTuplesBuilder}
- */
-public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
-    private List<String> topics;
-
-    /**
-     * @param topics list of topics that use this implementation to build 
tuples
-     */
-    public KafkaSpoutTupleBuilder(String... topics) {
-        if (topics == null || topics.length == 0) {
-            throw new IllegalArgumentException("Must specify at least one 
topic. It cannot be null or empty");
-        }
-        this.topics = Arrays.asList(topics);
-    }
-
-    /**
-     * @return list of topics that use this implementation to build tuples
-     */
-    public List<String> getTopics() {
-        return Collections.unmodifiableList(topics);
-    }
-
-    /**
-     * Builds a list of tuples using the ConsumerRecord specified as parameter
-     * @param consumerRecord whose contents are used to build tuples
-     * @return list of tuples
-     */
-    public abstract List<Object> buildTuple(ConsumerRecord<K, V> 
consumerRecord);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
deleted file mode 100644
index 2ba0a79..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from 
{@link ConsumerRecord}s.
- * The logic is provided by the user by implementing the appropriate number of 
{@link KafkaSpoutTupleBuilder} instances
- */
-public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable {
-    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
deleted file mode 100644
index 80fe543..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements 
KafkaSpoutTuplesBuilder<K,V> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class);
-
-    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-    private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) {
-        this.topicToTupleBuilders = builder.topicToTupleBuilders;
-        LOG.debug("Instantiated {}", this);
-    }
-
-    public static class Builder<K,V> {
-        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
-        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-        @SafeVarargs
-        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
-            if (tupleBuilders == null || tupleBuilders.length == 0) {
-                throw new IllegalArgumentException("Must specify at last one 
tuple builder per topic declared in KafkaSpoutStreams");
-            }
-
-            this.tupleBuilders = Arrays.asList(tupleBuilders);
-            topicToTupleBuilders = new HashMap<>();
-        }
-
-        public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() {
-            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
-                for (String topic : tupleBuilder.getTopics()) {
-                    if (!topicToTupleBuilders.containsKey(topic)) {
-                        topicToTupleBuilders.put(topic, tupleBuilder);
-                    }
-                }
-            }
-            return new KafkaSpoutTuplesBuilderNamedTopics<>(this);
-        }
-    }
-
-    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
-        final String topic = consumerRecord.topic();
-        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutTuplesBuilderNamedTopics {" +
-                "topicToTupleBuilders=" + topicToTupleBuilders +
-                '}';
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
deleted file mode 100644
index 85d4809..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements 
KafkaSpoutTuplesBuilder<K,V> {
-    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
-
-    public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> 
tupleBuilder) {
-        this.tupleBuilder = tupleBuilder;
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return tupleBuilder.buildTuple(consumerRecord);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
new file mode 100644
index 0000000..f5953ad
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream. {@see 
org.apache.storm.kafka.spout.RecordTranslator#apply}
+ */
+public class KafkaTuple extends Values {
+    private static final long serialVersionUID = 4803794470450587992L;
+    private String stream = null;
+    
+    public KafkaTuple() {
+        super();
+    }
+    
+    public KafkaTuple(Object... vals) {
+        super(vals);
+    }
+    
+    public KafkaTuple routedTo(String stream) {
+        assert(this.stream == null);
+        this.stream = stream;
+        return this;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
new file mode 100644
index 0000000..3f16220
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+
+/**
+ * The KafkaTupleListener handles state changes of a kafka tuple inside a 
KafkaSpout.
+ */
+public interface KafkaTupleListener extends Serializable {
+
+
+    /**
+     * Called during the initialization of the kafka spout.
+     *
+     * @param conf The storm configuration.
+     * @param context The {@link TopologyContext}
+     */
+    void open(Map<String, Object> conf, TopologyContext context);
+
+    /**
+     * Called when the tuple is emitted and auto commit is disabled.
+     * If kafka auto commit is enabled, the kafka consumer will periodically 
(depending on the commit interval)
+     * commit the offsets. Therefore, storm disables anchoring for tuples when 
auto commit is enabled and the spout will
+     * not receive acks and fails for those tuples.
+     *
+     * @param tuple the storm tuple.
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when a tuple is acked.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onAck(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when kafka partitions are rebalanced.
+     *
+     * @param partitions The list of partitions that are now assigned to the 
consumer (may include partitions previously
+     *                   assigned to the consumer)
+     */
+    void onPartitionsReassigned(Collection<TopicPartition> partitions);
+
+    /**
+     * Called when the Kafka spout sets a record for retry.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onRetry(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when the maximum number of retries have been reached.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onMaxRetryReached(KafkaSpoutMessageId msgId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
new file mode 100644
index 0000000..05cd361
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+    private static final long serialVersionUID = 5633018073527583826L;
+    private final ManualPartitioner partitioner;
+    private final TopicFilter partitionFilter;
+    private transient KafkaConsumer<?, ?> consumer = null;
+    private transient ConsumerRebalanceListener listener = null;
+    private transient TopologyContext context = null;
+
+    public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter 
partitionFilter) {
+        this.partitionFilter = partitionFilter;
+        this.partitioner = parter;
+    }
+    
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+        this.consumer = consumer;
+        this.listener = listener;
+        this.context = context;
+        refreshAssignment();
+    }
+    
+    @Override
+    public void refreshAssignment() {
+        List<TopicPartition> allPartitions = 
partitionFilter.getFilteredTopicPartitions(consumer);
+        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> newAssignment = new 
HashSet<>(partitioner.partition(allPartitions, context));
+        Set<TopicPartition> currentAssignment = consumer.assignment();
+        if (!newAssignment.equals(currentAssignment)) {
+            listener.onPartitionsRevoked(currentAssignment);
+            consumer.assign(newAssignment);
+            listener.onPartitionsAssigned(newAssignment);
+        }
+    }
+    
+    @Override
+    public String getTopicsString() {
+        return partitionFilter.getTopicsString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
new file mode 100644
index 0000000..f9a6869
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.List;
+
+import java.io.Serializable;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A function used to assign partitions to this spout.
+ * WARNING if this is not done correctly you can really mess things up, like 
not reading data in some partitions.
+ * The complete TopologyContext is passed in, but it is suggested that you use 
the index of the spout and the total
+ * number of spouts to avoid missing partitions or double assigning partitions.
+ */
+public interface ManualPartitioner extends Serializable {
+    /**
+     * Get the partitions for this assignment
+     * @param allPartitions all of the partitions that the set of spouts want 
to subscribe to, in a strict ordering
+     * @param context the context of the topology
+     * @return the subset of the partitions that this spout should use.
+     */
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, 
TopologyContext context);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
new file mode 100644
index 0000000..3bb7152
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link 
NamedTopicFilter} instead
+ */
+@Deprecated
+public class NamedSubscription extends Subscription {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+    private static final long serialVersionUID = 3438543305215813839L;
+    protected final Collection<String> topics;
+    
+    public NamedSubscription(Collection<String> topics) {
+        this.topics = Collections.unmodifiableCollection(new 
ArrayList<>(topics));
+    }
+    
+    public NamedSubscription(String ... topics) {
+        this(Arrays.asList(topics));
+    }
+
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, 
ConsumerRebalanceListener listener, TopologyContext unused) {
+        consumer.subscribe(topics, listener);
+        LOG.info("Kafka consumer subscribed topics {}", topics);
+        
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this 
poll, upon partition registration
+        consumer.poll(0);
+    }
+
+    @Override
+    public String getTopicsString() {
+        return StringUtils.join(topics, ",");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
new file mode 100644
index 0000000..3d1ec1e
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+    private final Set<String> topics;
+    
+    /**
+     * Create filter based on a set of topic names.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(Set<String> topics) {
+        this.topics = Collections.unmodifiableSet(topics);
+    }
+    
+    /**
+     * Convenience constructor.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(String... topics) {
+        this(new HashSet<>(Arrays.asList(topics)));
+    }
+    
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> 
consumer) {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (String topic : topics) {
+            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+                allPartitions.add(new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()));
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return StringUtils.join(topics, ",");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
new file mode 100644
index 0000000..dc9f9e3
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that match a given pattern.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link 
PatternTopicFilter} instead
+ */
+@Deprecated
+public class PatternSubscription extends Subscription {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PatternSubscription.class);
+    private static final long serialVersionUID = 3438543305215813839L;
+    protected final Pattern pattern;
+    
+    public PatternSubscription(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, 
ConsumerRebalanceListener listener, TopologyContext unused) {
+        consumer.subscribe(pattern, listener);
+        LOG.info("Kafka consumer subscribed topics matching wildcard pattern 
[{}]", pattern);
+        
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this 
poll, upon partition registration
+        consumer.poll(0);
+    }
+
+    @Override
+    public String getTopicsString() {
+        return pattern.pattern();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
new file mode 100644
index 0000000..5d2db96
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link 
Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+    private final Pattern pattern;
+    private final Set<String> topics = new HashSet<>();
+
+    /**
+     * Creates filter based on a Pattern. Only topic names matching the 
Pattern are passed by the filter.
+     *
+     * @param pattern The Pattern to use.
+     */
+    public PatternTopicFilter(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> 
consumer) {
+        topics.clear();
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (Map.Entry<String, List<PartitionInfo>> entry : 
consumer.listTopics().entrySet()) {
+            if (pattern.matcher(entry.getKey()).matches()) {
+                for (PartitionInfo partitionInfo : entry.getValue()) {
+                    allPartitions.add(new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                    topics.add(partitionInfo.topic());
+                }
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return StringUtils.join(topics, ",");
+    }
+
+    public String getTopicsPattern() {
+        return pattern.pattern();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
new file mode 100644
index 0000000..71af4d0
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
+
+/**
+ * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a 
tuple.
+ */
+public interface RecordTranslator<K, V> extends Serializable, 
Func<ConsumerRecord<K, V>, List<Object>> {
+    public static final List<String> DEFAULT_STREAM = 
Collections.singletonList("default");
+    
+    /**
+     * Translate the ConsumerRecord into a list of objects that can be emitted
+     * @param record the record to translate
+     * @return the objects in the tuple.  Return a {@link KafkaTuple}
+     * if you want to route the tuple to a non-default stream.
+     * Return null to discard an invalid {@link ConsumerRecord} if {@link 
Builder#setEmitNullTuples(boolean)} is set to true
+     */
+    List<Object> apply(ConsumerRecord<K,V> record);
+    
+    /**
+     * Get the fields associated with a stream.  The streams passed in are
+     * returned by the {@link RecordTranslator.streams} method.
+     * @param stream the stream the fields are for
+     * @return the fields for that stream.
+     */
+    Fields getFieldsFor(String stream);
+    
+    /**
+     * @return the list of streams that this will handle.
+     */
+    List<String> streams();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..e23e2dc
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of 
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ * 
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second 
partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+    @Override
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, 
TopologyContext context) {
+        int thisTaskIndex = context.getThisTaskIndex();
+        int totalTaskCount = 
context.getComponentTasks(context.getThisComponentId()).size();
+        Set<TopicPartition> myPartitions = new 
HashSet<>(allPartitions.size()/totalTaskCount+1);
+        for (int i = thisTaskIndex; i < allPartitions.size(); i += 
totalTaskCount) {
+            myPartitions.add(allPartitions.get(i));
+        }
+        return new ArrayList<>(myPartitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 0000000..cc37348
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param <T> The type this deserializer deserializes to.
+ * @deprecated Avoid using this class. Use {@link 
KafkaSpoutConfig.Builder#setProp(java.lang.String, java.lang.Object) } with
+ * {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link 
ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+ */
+@Deprecated
+public interface SerializableDeserializer<T> extends Deserializer<T>, 
Serializable {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
new file mode 100644
index 0000000..46c2849
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = 4678369144122009596L;
+    private final Fields fields;
+    private final Func<ConsumerRecord<K, V>, List<Object>> func;
+    private final String stream;
+
+    public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> 
func, Fields fields) {
+        this(func, fields, "default");
+    }
+    
+    public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> 
func, Fields fields, String stream) {
+        this.func = func;
+        this.fields = fields;
+        this.stream = stream;
+    }
+    
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        KafkaTuple ret = new KafkaTuple();
+        ret.addAll(func.apply(record));
+        return ret.routedTo(stream);
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return fields;
+    }
+    
+    @Override
+    public List<String> streams() {
+        return Arrays.asList(stream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
new file mode 100644
index 0000000..722039d
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+    private static final long serialVersionUID = -216136367240198716L;
+
+    /**
+     * Subscribe the KafkaConsumer to the proper topics. Implementations must 
ensure that a given topic partition is always assigned to the
+     * same spout task. Adding and removing partitions as necessary is fine, 
but partitions must not move from one task to another. This
+     * constraint is only important for use with the Trident spout.
+     *
+     * @param consumer the Consumer to get.
+     * @param listener the rebalance listener to include in the subscription
+     */
+    public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, 
ConsumerRebalanceListener listener, TopologyContext context);
+    
+    /**
+     * @return A human-readable string representing the subscribed topics.
+     */
+    public abstract String getTopicsString();
+    
+    /**
+     * NOOP is the default behavior, which means that Kafka will internally 
handle partition assignment.
+     * If you wish to do manual partition management, you must provide an 
implementation of this method
+     * that will check with kafka for any changes and call the 
ConsumerRebalanceListener from subscribe
+     * to inform the rest of the system of those changes.
+     */
+    public void refreshAssignment() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
new file mode 100644
index 0000000..7631c8a
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+    
+    /**
+     * Get the Kafka TopicPartitions passed by this filter. 
+     * @param consumer The Kafka consumer to use to read the list of existing 
partitions
+     * @return The Kafka partitions passed by this filter.
+     */
+    List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> 
consumer);
+    
+    /**
+     * @return A human-readable string representing the topics that pass the 
filter.
+     */
+    String getTopicsString();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
new file mode 100644
index 0000000..dafb97c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Comparator;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Singleton comparator of TopicPartitions.  Topics have precedence over 
partitions.
+ * Topics are compared through String.compare and partitions are compared
+ * numerically.
+ * 
+ * Use INSTANCE for all sorting.
+ */
+public class TopicPartitionComparator implements Comparator<TopicPartition> {
+    public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
+    
+    /**
+     * Private to make it a singleton
+     */
+    private TopicPartitionComparator() {
+        //Empty
+    }
+    
+    @Override
+    public int compare(TopicPartition o1, TopicPartition o2) {
+        if (!o1.topic().equals(o2.topic())) {
+            return o1.topic().compareTo(o2.topic());
+        } else {
+            return o1.partition() - o2.partition();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
new file mode 100644
index 0000000..b7fd1a6
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Object representing metadata committed to Kafka.
+ */
+public class CommitMetadata {
+    private final String topologyId;
+    private final int taskId;
+    private final String threadName;
+
+    /** Kafka metadata. */
+    @JsonCreator
+    public CommitMetadata(@JsonProperty("topologyId") String topologyId,
+                          @JsonProperty("taskId") int taskId,
+                          @JsonProperty("threadName") String threadName) {
+
+        this.topologyId = topologyId;
+        this.taskId = taskId;
+        this.threadName = threadName;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public String getThreadName() {
+        return threadName;
+    }
+
+    @Override
+    public String toString() {
+        return "CommitMetadata{"
+            + "topologyId='" + topologyId + '\''
+            + ", taskId=" + taskId
+            + ", threadName='" + threadName + '\''
+            + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout 
instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee 
processingGuarantee) {
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, 
false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+            return 
committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the 
last commit to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a 
process other than the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", 
committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
index 7900388..ec2fbac 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -22,8 +22,7 @@ public class KafkaConsumerFactoryDefault<K, V> implements 
KafkaConsumerFactory<K
 
     @Override
     public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> 
kafkaSpoutConfig) {
-        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
     }
     
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
new file mode 100755
index 0000000..c9f9541
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages acked and committed offsets for a TopicPartition. This class is not 
thread safe
+ */
+public class OffsetManager {
+    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = 
new OffsetComparator();
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetManager.class);
+
+    private final TopicPartition tp;
+    // Emitted Offsets List
+    private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
+    // Acked messages sorted by ascending order of offset
+    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new 
TreeSet<>(OFFSET_COMPARATOR);
+    // Committed offset, i.e. the offset where processing will resume upon 
spout restart. Initially it is set to fetchOffset.
+    private long committedOffset;
+    // True if this OffsetManager has made at least one commit to Kafka
+    private boolean committed;
+    private long latestEmittedOffset;
+
+    /**
+     * Creates a new OffsetManager.
+     * @param tp The TopicPartition
+     * @param initialFetchOffset The initial fetch offset for the given 
TopicPartition
+     */
+    public OffsetManager(TopicPartition tp, long initialFetchOffset) {
+        this.tp = tp;
+        this.committedOffset = initialFetchOffset;
+        LOG.debug("Instantiated {}", this.toString());
+    }
+
+    public void addToAckMsgs(KafkaSpoutMessageId msgId) {          // O(Log N)
+        ackedMsgs.add(msgId);
+    }
+
+    public void addToEmitMsgs(long offset) {
+        this.emittedOffsets.add(offset);  // O(Log N)
+        this.latestEmittedOffset = Math.max(latestEmittedOffset, offset);
+    }
+    
+    public int getNumUncommittedOffsets() {
+        return this.emittedOffsets.size();
+    }
+    
+    /**
+     * Gets the offset of the nth emitted message after the committed offset. 
+     * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have been 
emitted,
+     * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8.
+     * 
+     * @param index The index of the message to get the offset for
+     * @return The offset
+     * @throws NoSuchElementException if the index is out of range
+     */
+    public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
+        Iterator<Long> offsetIter = emittedOffsets.iterator();
+        for (int i = 0; i < index - 1; i++) {
+            offsetIter.next();
+        }
+        return offsetIter.next();
+    }
+
+    /**
+     * An offset can only be committed when all emitted records with lower 
offset have been
+     * acked. This guarantees that all offsets smaller than the committedOffset
+     * have been delivered, or that those offsets no longer exist in Kafka. 
+     * <p/>
+     * The returned offset points to the earliest uncommitted offset, and 
matches the semantics of the KafkaConsumer.commitSync API.
+     *
+     * @param commitMetadata Metadata information to commit to Kafka. It is 
constant per KafkaSpout instance per topology
+     * @return the next OffsetAndMetadata to commit, or null if no offset is
+     *     ready to commit.
+     */
+    public OffsetAndMetadata findNextCommitOffset(final String commitMetadata) 
{
+        boolean found = false;
+        long currOffset;
+        long nextCommitOffset = committedOffset;
+
+        for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
+            currOffset = currAckedMsg.offset();
+            if (currOffset == nextCommitOffset) {
+                // found the next offset to commit
+                found = true;
+                nextCommitOffset = currOffset + 1;
+            } else if (currOffset > nextCommitOffset) {
+                if (emittedOffsets.contains(nextCommitOffset)) {
+                    LOG.debug("topic-partition [{}] has non-sequential offset 
[{}]."
+                        + " It will be processed in a subsequent batch.", tp, 
currOffset);
+                    break;
+                } else {
+                    /*
+                        This case will arise in case of non-sequential offset 
being processed.
+                        So, if the topic doesn't contain offset = 
nextCommitOffset (possible
+                        if the topic is compacted or deleted), the consumer 
should jump to
+                        the next logical point in the topic. Next logical 
offset should be the
+                        first element after nextCommitOffset in the ascending 
ordered emitted set.
+                     */
+                    LOG.debug("Processed non-sequential offset."
+                        + " The earliest uncommitted offset is no longer part 
of the topic."
+                        + " Missing offset: [{}], Processed: [{}]", 
nextCommitOffset, currOffset);
+                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset);
+                    if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
+                        LOG.debug("Found committable offset: [{}] after 
missing offset: [{}], skipping to the committable offset",
+                            currOffset, nextCommitOffset);
+                        nextCommitOffset = currOffset + 1;
+                    } else {
+                        LOG.debug("Topic-partition [{}] has non-sequential 
offset [{}]."
+                            + " Next offset to commit should be [{}]", tp, 
currOffset, nextCommitOffset);
+                        break;
+                    }
+                }
+            } else {
+                throw new IllegalStateException("The offset [" + currOffset + 
"] is below the current nextCommitOffset "
+                    + "[" + nextCommitOffset + "] for [" + tp + "]."
+                    + " This should not be possible, and likely indicates a 
bug in the spout's acking or emit logic.");
+            }
+        }
+
+        OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+        if (found) {
+            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, commitMetadata);
+
+            LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be 
committed."
+                + " Processing will resume at offset [{}] upon spout restart",
+                tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, 
nextCommitOffsetAndMetadata.offset());
+        } else {
+            LOG.debug("Topic-partition [{}] has no offsets ready to be 
committed", tp);
+        }
+        LOG.trace("{}", this);
+        return nextCommitOffsetAndMetadata;
+    }
+
+    /**
+     * Marks an offset as committed. This method has side effects - it sets the
+     * internal state in such a way that future calls to
+     * {@link #findNextCommitOffset(String)} will return offsets greater than 
or equal to the
+     * offset specified, if any.
+     *
+     * @param committedOffsetAndMeta The committed offset. All lower offsets 
are expected to have been committed.
+     * @return Number of offsets committed in this commit
+     */
+    public long commit(OffsetAndMetadata committedOffsetAndMeta) {
+        committed = true;
+        final long preCommitCommittedOffset = this.committedOffset;
+        long numCommittedOffsets = 0;
+        this.committedOffset = committedOffsetAndMeta.offset();
+        for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); 
iterator.hasNext();) {
+            if (iterator.next().offset() < committedOffsetAndMeta.offset()) {
+                iterator.remove();
+                numCommittedOffsets++;
+            } else {
+                break;
+            }
+        }
+
+        for (Iterator<Long> iterator = emittedOffsets.iterator(); 
iterator.hasNext();) {
+            if (iterator.next() < committedOffsetAndMeta.offset()) {
+                iterator.remove();
+            } else {
+                break;
+            }
+        }
+
+        LOG.trace("{}", this);
+        
+        LOG.debug("Committed [{}] offsets in the range [{}-{}] for 
topic-partition [{}]."
+            + " Processing will resume at [{}] upon spout restart",
+                numCommittedOffsets, preCommitCommittedOffset, 
this.committedOffset - 1, tp, this.committedOffset);
+        
+        return numCommittedOffsets;
+    }
+
+    /**
+     * Checks if this OffsetManager has committed to Kafka.
+     *
+     * @return true if this OffsetManager has made at least one commit to 
Kafka, false otherwise
+     */
+    public boolean hasCommitted() {
+        return committed;
+    }
+
+    public boolean contains(KafkaSpoutMessageId msgId) {
+        return ackedMsgs.contains(msgId);
+    }
+
+    @VisibleForTesting
+    boolean containsEmitted(long offset) {
+        return emittedOffsets.contains(offset);
+    }
+
+    public long getLatestEmittedOffset() {
+        return latestEmittedOffset;
+    }
+
+    public long getCommittedOffset() {
+        return committedOffset;
+    }
+
+    @Override
+    public final String toString() {
+        return "OffsetManager{"
+            + "topic-partition=" + tp
+            + ", committedOffset=" + committedOffset
+            + ", emittedOffsets=" + emittedOffsets
+            + ", ackedMsgs=" + ackedMsgs
+            + ", latestEmittedOffset=" + latestEmittedOffset
+            + '}';
+    }
+
+    private static class OffsetComparator implements 
Comparator<KafkaSpoutMessageId> {
+
+        @Override
+        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() 
? 0 : 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
new file mode 100644
index 0000000..2a2e1cb
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
+
+public class Timer {
+    private final long delay;
+    private final long period;
+    private final TimeUnit timeUnit;
+    private final long periodNanos;
+    private long start;
+
+    /**
+     * Creates a class that mimics a single threaded timer that expires 
periodically. If a call to {@link
+     * #isExpiredResetOnTrue()} occurs later than {@code period} since the 
timer was initiated or reset, this method returns
+     * true. Each time the method returns true the counter is reset. The timer 
starts with the specified time delay.
+     *
+     * @param delay    the initial delay before the timer starts
+     * @param period   the period between calls {@link #isExpiredResetOnTrue()}
+     * @param timeUnit the time unit of delay and period
+     */
+    public Timer(long delay, long period, TimeUnit timeUnit) {
+        this.delay = delay;
+        this.period = period;
+        this.timeUnit = timeUnit;
+
+        periodNanos = timeUnit.toNanos(period);
+        start = Time.nanoTime() + timeUnit.toNanos(delay);
+    }
+
+    public long period() {
+        return period;
+    }
+
+    public long delay() {
+        return delay;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    /**
+     * Checks if a call to this method occurs later than {@code period} since 
the timer was initiated or reset. If that is the
+     * case the method returns true, otherwise it returns false. Each time 
this method returns true, the counter is reset
+     * (re-initiated) and a new cycle will start.
+     *
+     * @return true if the time elapsed since the last call returning true is 
greater than {@code period}. Returns false
+     * otherwise.
+     */
+    public boolean isExpiredResetOnTrue() {
+        final boolean expired = Time.nanoTime() - start >= periodNanos;
+        if (expired) {
+            start = Time.nanoTime();
+        }
+        return expired;
+    }
+}
\ No newline at end of file

Reply via email to