STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client
STORM-2225: change spout config to be simpler.
STORM-2228: removed ability to request a single topic go to multiple streams
STORM-2236: Reimplemented manual partition management on top of STORM-2225


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a3e6f60f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a3e6f60f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a3e6f60f

Branch: refs/heads/1.x-branch
Commit: a3e6f60f250dd94a18bee91ffa8d1a0c9b591670
Parents: cbffc00
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Nov 29 21:39:26 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 30 14:48:42 2017 -0600

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 296 ++++++++---
 .../TridentKafkaClientWordCountNamedTopics.java |  77 +--
 ...identKafkaClientWordCountWildcardTopics.java |  41 +-
 external/storm-kafka-client/README.md           | 193 +------
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  | 223 +++++++++
 .../FieldNameBasedTupleToKafkaMapper.java       |  48 ++
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |  32 ++
 .../bolt/selector/DefaultTopicSelector.java     |  34 ++
 .../bolt/selector/FieldIndexTopicSelector.java  |  52 ++
 .../bolt/selector/FieldNameTopicSelector.java   |  49 ++
 .../kafka/bolt/selector/KafkaTopicSelector.java |  26 +
 .../kafka/spout/ByTopicRecordTranslator.java    | 149 ++++++
 .../kafka/spout/DefaultRecordTranslator.java    |  47 ++
 .../java/org/apache/storm/kafka/spout/Func.java |  26 +
 .../apache/storm/kafka/spout/KafkaSpout.java    | 178 ++-----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 498 +++++++++++++------
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |   2 +-
 .../storm/kafka/spout/KafkaSpoutStream.java     | 121 -----
 .../storm/kafka/spout/KafkaSpoutStreams.java    |  38 --
 .../spout/KafkaSpoutStreamsNamedTopics.java     | 165 ------
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |  67 ---
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  58 ---
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  32 --
 .../KafkaSpoutTuplesBuilderNamedTopics.java     |  78 ---
 .../KafkaSpoutTuplesBuilderWildcardTopics.java  |  36 --
 .../apache/storm/kafka/spout/KafkaTuple.java    |  47 ++
 .../spout/ManualPartitionNamedSubscription.java |  78 +++
 .../ManualPartitionPatternSubscription.java     |  76 +++
 .../storm/kafka/spout/ManualPartitioner.java    |  40 ++
 .../storm/kafka/spout/NamedSubscription.java    |  61 +++
 .../storm/kafka/spout/PatternSubscription.java  |  54 ++
 .../storm/kafka/spout/RecordTranslator.java     |  53 ++
 .../spout/RoundRobinManualPartitioner.java      |  50 ++
 .../kafka/spout/SimpleRecordTranslator.java     |  58 +++
 .../apache/storm/kafka/spout/Subscription.java  |  53 ++
 .../kafka/spout/TopicPartitionComparator.java   |  49 ++
 .../storm/kafka/spout/internal/Timer.java       |  74 +++
 .../spout/trident/KafkaTridentSpoutEmitter.java |  60 ++-
 .../spout/trident/KafkaTridentSpoutManager.java |  58 +--
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  12 +-
 .../storm/kafka/trident/TridentKafkaState.java  | 115 +++++
 .../kafka/trident/TridentKafkaStateFactory.java |  63 +++
 .../FieldNameBasedTupleToKafkaMapper.java       |  41 ++
 .../mapper/TridentTupleToKafkaMapper.java       |  28 ++
 .../trident/selector/DefaultTopicSelector.java  |  34 ++
 .../trident/selector/KafkaTopicSelector.java    |  26 +
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |  91 ++++
 .../spout/ByTopicRecordTranslatorTest.java      |  93 ++++
 .../spout/DefaultRecordTranslatorTest.java      |  37 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  40 ++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  49 +-
 .../spout/KafkaSpoutStreamsNamedTopicsTest.java |  38 --
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../SingleTopicKafkaSpoutConfiguration.java     |  80 ++-
 .../builders/TopicKeyValueTupleBuilder.java     |  40 --
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  80 ++-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  50 +-
 .../spout/test/TopicTest2TupleBuilder.java      |  40 --
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 --
 .../kafka/DynamicPartitionConnections.java      |   2 +-
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |   2 +-
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |   2 +-
 .../apache/storm/kafka/PartitionManager.java    |   2 +-
 .../apache/storm/kafka/StaticCoordinator.java   |   4 +-
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |   2 +
 .../storm/kafka/DynamicBrokersReaderTest.java   |   6 +-
 .../apache/storm/kafka/TridentKafkaTest.java    |   2 +-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |   2 +-
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |   7 +-
 .../src/jvm/org/apache/storm/tuple/Fields.java  |  18 +-
 70 files changed, 2868 insertions(+), 1559 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index c8e038f..90aca55 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -1,90 +1,257 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka versions 
0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting at 
offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from the 
offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach 
it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from 
`ConsumerRecord`s. The logic is provided by the user through implementing the 
appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use 
`KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` 
implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s 
is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+    K getKeyFromTuple(Tuple/TridentTuple tuple);
+    V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka key 
and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field 
with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for 
backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and 
message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+    String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one static 
topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to select 
the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic name 
in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by calling 
`KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more details.
+These are also defined in `org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+     Config config = new Config();
+     config.put("kafka.topic.wildcard.match",true);
+
+```
+
+After this you can specify a wildcard topic for matching e.g. 
clickstream.*.log.  This will match all streams matching clickstream.my.log, 
clickstream.cart.log etc
+
+
+###Putting it all together
+
+For the bolt :
+```java
+        TopologyBuilder builder = new TopologyBuilder();
+
+        Fields fields = new Fields("key", "message");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                    new Values("storm", "1"),
+                    new Values("trident", "1"),
+                    new Values("needs", "1"),
+                    new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+        builder.setSpout("spout", spout, 5);
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+        KafkaBolt bolt = new KafkaBolt()
+                .withProducerProperties(props)
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper());
+        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+        Config conf = new Config();
+
+        StormSubmitter.submitTopology("kafkaboltTest", conf, 
builder.createTopology());
+```
+
+For Trident:
+
+```java
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", "1"),
+                new Values("trident", "1"),
+                new Values("needs", "1"),
+                new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withProducerProperties(props)
+                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+                .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
+        stream.partitionPersist(stateFactory, fields, new 
TridentKafkaUpdater(), new Fields());
+
+        Config conf = new Config();
+        StormSubmitter.submitTopology("kafkaTridentTest", conf, 
topology.build());
+```
+
+## Reading From kafka (Spouts)
+
+### Configuration
+
+The spout implementations are configured by use of the `KafkaSpoutConfig` 
class.  This class uses a Builder pattern and can be started either by calling 
one of
+the Builders constructors or by calling the static method builder in the 
KafkaSpoutConfig class.
+
+The Constructor or static method to create the builder require a few key 
values (that can be changed later on) but are the minimum config needed to start
+a spout.
+
+`bootstrapServers` is the same as the Kafka Consumer Property 
"bootstrap.servers".
+`topics` The topics the spout will consume can either be a `Collection` of 
specific topic names (1 or more) or a regular expression `Pattern`, which 
specifies
+that any topics that match that regular expression will be consumed.
 
-# Usage Examples
+In the case of the Constructors you may also need to specify a key 
deserializer and a value deserializer.  This is to help guarantee type safety 
through the use
+of Java generics.  The defaults are `StringDeserializer`s and can be 
overwritten by calling `setKeyDeserializer` and/or `setValueDeserializer`.
+If these are set to null the code will fall back to what is set in the kafka 
properties, but it is preferable to be explicit here, again to maintain 
+type safety with the generics.
 
-### Create a Kafka Spout
+There are a few key configs to pay attention to.
 
-The code snippet bellow is extracted from the example in the module [test] 
(https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
 The code that is common for named topics and topic wildcards is in the first 
box. The specific implementations are in the appropriate section. 
+`setFirstPollOffsetStrategy` allows you to set where to start consuming data 
from.  This is used both in case of failure recovery and starting the spout
+for the first time. Allowed values include
 
-These snippets serve as a reference and do not compile. If you would like to 
reuse this code in your implementation, please obtain it from the test module, 
where it is complete.
+ * `EARLIEST` means that the kafka spout polls records starting in the first 
offset of the partition, regardless of previous commits
+ * `LATEST` means that the kafka spout polls records with offsets greater than 
the last offset in the partition, regardless of previous commits
+ * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records 
from the last committed offset, if any. If no offset has been committed, it 
behaves as `EARLIEST`.
+ * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last 
committed offset, if any. If no offset has been committed, it behaves as 
`LATEST`.
 
+`setRecordTranslator` allows you to modify how the spout converts a Kafka 
Consumer Record into a Tuple, and which stream that tuple will be published 
into.
+By default the "topic", "partition", "offset", "key", and "value" will be 
emitted to the "default" stream.  If you want to output entries to different
+streams based on the topic, storm provides `ByTopicRecordTranslator`.  See 
below for more examples on how to use these.
+
+`setProp` can be used to set kafka properties that do not have a convenience 
method.
+
+`setGroupId` lets you set the id of the kafka consumer group property 
"group.id'
+
+`setSSLKeystore` and `setSSLTruststore` allow you to configure SSL 
authentication.
+
+### Usage Examples
+
+The API is written with java 8 lambda expressions in mind.  It works with 
java7 and below though.
+
+#### Create a Simple Insecure Spout
+The following will consume all events published to "topic" and send them to 
MyBolt with the fields "topic", "partition", "offset", "key", "value".
 ```java
-KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-
-KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, 
String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
-        .setOffsetCommitPeriodMs(10_000)
-        .setFirstPollOffsetStrategy(EARLIEST)
-        .setMaxUncommittedOffsets(250)
-        .build();
-
-Map<String, Object> kafkaConsumerProps= new HashMap<>();
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-
-KafkaSpoutRetryService retryService = new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
-        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), 
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
+
+final TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 
1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
+...
+
 ```
 
-### Named Topics
+#### Wildcard Topics
+Wildcard topics will consume from all topics that exist in the specified 
brokers list and match the pattern.  So in the following example
+"topic", "topic_foo" and "topic_bar" will all match the pattern "topic.*", but 
"not_my_topic" would not match. 
 ```java
-KafkaSpoutStreams kafkaSpoutStreams = new 
KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new 
String[]{TOPICS[0], TOPICS[1]})
-            .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test_stream
-            .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test2_stream
-            .build();
-            
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new 
KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-            new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
-            new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-            .build();
-            
-String[] STREAMS = new String[]{"test_stream", "test1_stream", "test2_stream"};
-String[] TOPICS = new String[]{"test", "test1", "test2"};
-
-Fields outputFields = new Fields("topic", "partition", "offset", "key", 
"value");
-Fields outputFields1 = new Fields("topic", "partition", "offset");
+
+final TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, 
Pattern.compile("topic.*")).build()), 1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
+...
+
 ```
 
-### Topic Wildcards
+#### Multiple Streams
+This uses java 8 lambda expressions.
 ```java
-KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsWildcardTopics(
-            new KafkaSpoutStream(outputFields, STREAM, 
Pattern.compile(TOPIC_WILDCARD_PATTERN)));
 
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new 
TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
+final TopologyBuilder tp = new TopologyBuilder();
+
+//By default all topics not covered by another rule, but consumed by the spout 
will be emitted to "STREAM_1" as "topic", "key", and "value"
+ByTopicRecordTranslator<String, String> byTopic = new 
ByTopicRecordTranslator<>(
+    (r) -> new Values(r.topic(), r.key(), r.value()),
+    new Fields("topic", "key", "value"), "STREAM_1");
+//For topic_2 all events will be emitted to "STREAM_2" as just "key" and 
"value"
+byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new 
Fields("key", "value"), "STREAM_2");
+
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", 
"topic_2", "topic_3").build()), 1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
+tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", 
"STREAM_2");
+...
 
-String STREAM = "test_wildcard_stream";
-String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+```
+
+#### Trident
+
+```java
+final TridentTopology tridentTopology = new TridentTopology();
+final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
+    new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, Pattern.compile("topic.*")).build()))
+      .parallelismHint(1)
+...
 
-Fields outputFields = new Fields("topic", "partition", "offset", "key", 
"value");
 ```
 
-### Create a simple Toplogy using the Kafka Spout:
+Trident does not support multiple streams and will ignore any streams set for 
output.  If however the Fields are not identical for each
+output topic it will throw an exception and not continue.
+
+### Custom RecordTranslators (ADVANCED)
+
+In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator 
should cover your use case.  If you do run into a situation where you need a 
custom one
+then this documentation will describe how to do this properly, and some of the 
less than obvious classes involved.
 
+The point of apply is to take a ConsumerRecord and turn it into a 
`List<Object>` that can be emitted.  What is not obvious is how to tell the 
spout to emit it to a
+specific stream.  To do this you will need to return an instance of 
`org.apache.storm.kafka.spout.KafkaTuple`.  This provides a method `routedTo` 
that will say which
+specific stream the tuple should go to.
 
+For Example:
 ```java
-TopologyBuilder tp = new TopologyBuilder();
-tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
-tp.createTopology();
+return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
 ```
 
-# Build And Run Bundled Examples  
-To be able to run the examples you must first build the java code in the 
package `storm-kafka-client`, 
-and then generate an uber jar with all the dependencies.
+Will cause the tuple to be emitted on the "bar" stream.
+
+Be careful when writing custom record translators because just like in a storm 
spout it needs to be self consistent.  The `streams` method should return
+a full set of streams that this translator will ever try to emit on.  
Additionally `getFieldsFor` should return a valid Fields object for each of 
those
+streams.  If you are doing this for Trident a value must be in the List 
returned by apply for every field in the Fields object for that stream,
+otherwise trident can throw exceptions.
+
+
+### Manual Partition Control (ADVANCED)
+
+By default Kafka will automatically assign partitions to the current set of 
spouts.  It handles lots of things, but in some cases you may want to manually 
assign the partitions.
+This can cause less churn in the assignments when spouts go down and come back 
up, but it can result in a lot of issues if not done right.  This can all be 
handled by subclassing
+Subscription and we have a few implementations that you can look at for 
examples on how to do this.  ManualPartitionNamedSubscription and 
ManualPartitionPatternSubscription.  Again
+please be careful when using these or implementing your own.
 
 ## Use the Maven Shade Plugin to Build the Uber Jar
 
@@ -112,7 +279,7 @@ Add the following to 
`REPO_HOME/storm/external/storm-kafka-client/pom.xml`
 </plugin>
 ```
 
-create the uber jar by running the commmand:
+create the uber jar by running the command:
 
 `mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
 
@@ -122,11 +289,11 @@ This will create the uber jar file with the name and 
location matching the follo
 
 ### Run Storm Topology
 
-Copy the file 
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
 to `STORM_HOME/extlib`
+Copy the file 
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jar` 
to `STORM_HOME/extlib`
 
 Using the Kafka command line tools create three topics [test, test1, test2] 
and use the Kafka console producer to populate the topics with some data 
 
-Execute the command `STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm/target/storm-kafka-client-1.0.x.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
+Execute the command `STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
 
 With the debug level logs enabled it is possible to see the messages of each 
topic being redirected to the appropriate Bolt as defined 
 by the streams defined and choice of shuffle grouping.   
@@ -181,8 +348,3 @@ Currently the Kafka spout has has the following default 
values, which have shown
 * offset.commit.period.ms = 30000   (30s)
 * max.uncommitted.offsets = 10000000
 <br/>
-
-There will be a blog post coming soon analyzing the trade-offs of this tuning 
parameters, and comparing the performance of the Kafka Spouts using the Kafka 
client API introduced in 0.9 (new implementation) and in prior versions (prior 
implementation)
-
-#Future Work
- Implement comprehensive metrics. Trident spout is coming soon.

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index db26549..1ae69c8 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -18,87 +18,58 @@
 
 package org.apache.storm.kafka.trident;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
-import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
 public class TridentKafkaClientWordCountNamedTopics {
     private static final String TOPIC_1 = "test-trident";
     private static final String TOPIC_2 = "test-trident-1";
     private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
 
     private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque() {
-        return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
-                        newKafkaSpoutConfig(
-                        newKafkaSpoutStreams())));
+        return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
     }
 
-    private KafkaSpoutConfig<String,String> 
newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
-                    kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
+    private static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
+        return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
+                .setGroupId("kafkaSpoutTestGroup")
+                .setMaxPartitionFectchBytes(200)
+                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
+                .setRetry(newRetryService())
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
                 .setMaxUncommittedOffsets(250)
                 .build();
     }
 
-    protected Map<String,Object> newKafkaConsumerProps() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
"127.0.0.1:9092");
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("max.partition.fetch.bytes", 200);
-        return props;
-    }
-
-    protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-                new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
-                .build();
-    }
-
     protected KafkaSpoutRetryService newRetryService() {
-        return new KafkaSpoutRetryExponentialBackoff(new 
KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
-                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
-                Integer.MAX_VALUE, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
-    }
-
-    protected KafkaSpoutStreams newKafkaSpoutStreams() {
-        return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new 
String[]{"test-trident","test-trident-1"}).build();
-    }
-
-    protected static class TopicsTupleBuilder<K, V> extends 
KafkaSpoutTupleBuilder<K,V> {
-        public TopicsTupleBuilder(String... topics) {
-            super(topics);
-        }
-        @Override
-        public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-            return new Values(consumerRecord.value());
-        }
+        return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, 
TimeUnit.MICROSECONDS),
+                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
     }
 
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
index 02471c5..94a9765 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
@@ -18,26 +18,37 @@
 
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.kafka.spout.KafkaSpoutStream;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics;
-import org.apache.storm.tuple.Fields;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
+import java.util.List;
 import java.util.regex.Pattern;
 
-public class TridentKafkaClientWordCountWildcardTopics extends 
TridentKafkaClientWordCountNamedTopics {
-    private static final String TOPIC_WILDCARD_PATTERN = "test-trident(-1)?";
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
-    protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderWildcardTopics<>(new 
TopicsTupleBuilder<String, String>(TOPIC_WILDCARD_PATTERN));
-    }
+public class TridentKafkaClientWordCountWildcardTopics extends 
TridentKafkaClientWordCountNamedTopics {
+    private static final Pattern TOPIC_WILDCARD_PATTERN = 
Pattern.compile("test-trident(-1)?");
 
-    protected KafkaSpoutStreams newKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("str");
-        final KafkaSpoutStream kafkaSpoutStream = new 
KafkaSpoutStream(outputFields, Pattern.compile(TOPIC_WILDCARD_PATTERN));
-        return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream);
+    private static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
+        return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
+                .setGroupId("kafkaSpoutTestGroup")
+                .setMaxPartitionFectchBytes(200)
+                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
+                .setRetry(newRetryService())
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .build();
     }
 
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md 
b/external/storm-kafka-client/README.md
index 5784b8a..465e466 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -1,192 +1,5 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar (This includes the 
new Apache Kafka consumer API)
 
-Apache Storm Spout implementation to consume data from Apache Kafka versions 
0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+Spouts and Bolts that write to and read from Kafka through the kafka-client 
library.
 
-The Apache Storm Spout allows clients to consume data from Kafka starting at 
offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from the 
offset that matches the chosen `FirstPollOffsetStrategy`.
-
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
-
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from 
`ConsumerRecord`s. The logic is provided by the user through implementing the 
appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use 
`KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
-
-Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` 
implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s 
is identical.
-
-
-# Usage Examples
-
-### Create a Kafka Spout
-
-The code snippet bellow is extracted from the example in the module [test] 
(https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
 The code that is common for named topics and topic wildcards is in the first 
box. The specific implementations are in the appropriate section. 
-
-These snippets serve as a reference and do not compile. If you would like to 
reuse this code in your implementation, please obtain it from the test module, 
where it is complete.
-
-```java
-KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-
-KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, 
String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
-        .setOffsetCommitPeriodMs(10_000)
-        .setFirstPollOffsetStrategy(EARLIEST)
-        .setMaxUncommittedOffsets(250)
-        .build();
-
-Map<String, Object> kafkaConsumerProps= new HashMap<>();
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-
-KafkaSpoutRetryService retryService = new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
-        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), 
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
-```
-
-### Named Topics
-```java
-KafkaSpoutStreams kafkaSpoutStreams = new 
KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new 
String[]{TOPICS[0], TOPICS[1]})
-            .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test_stream
-            .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test2_stream
-            .build();
-            
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new 
KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-            new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
-            new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-            .build();
-            
-String[] STREAMS = new String[]{"test_stream", "test1_stream", "test2_stream"};
-String[] TOPICS = new String[]{"test", "test1", "test2"};
-
-Fields outputFields = new Fields("topic", "partition", "offset", "key", 
"value");
-Fields outputFields1 = new Fields("topic", "partition", "offset");
-```
-
-### Topic Wildcards
-```java
-KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsWildcardTopics(
-            new KafkaSpoutStream(outputFields, STREAM, 
Pattern.compile(TOPIC_WILDCARD_PATTERN)));
-
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new 
TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
-
-String STREAM = "test_wildcard_stream";
-String TOPIC_WILDCARD_PATTERN = "test[1|2]";
-
-Fields outputFields = new Fields("topic", "partition", "offset", "key", 
"value");
-```
-
-### Create a simple Toplogy using the Kafka Spout:
-
-
-```java
-TopologyBuilder tp = new TopologyBuilder();
-tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
-tp.createTopology();
-```
-
-# Build And Run Bundled Examples  
-To be able to run the examples you must first build the java code in the 
package `storm-kafka-client`, 
-and then generate an uber jar with all the dependencies.
-
-## Use the Maven Shade Plugin to Build the Uber Jar
-
-Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-```xml
-<plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-    <artifactId>maven-shade-plugin</artifactId>
-    <version>2.4.1</version>
-    <executions>
-        <execution>
-            <phase>package</phase>
-            <goals>
-                <goal>shade</goal>
-            </goals>
-            <configuration>
-                <transformers>
-                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                        
<mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
-                    </transformer>
-                </transformers>
-            </configuration>
-        </execution>
-    </executions>
-</plugin>
-```
-
-create the uber jar by running the commmand:
-
-`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-
-This will create the uber jar file with the name and location matching the 
following pattern:
- 
-`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
-
-### Run Storm Topology
-
-Copy the file 
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
 to `STORM_HOME/extlib`
-
-Using the Kafka command line tools create three topics [test, test1, test2] 
and use the Kafka console producer to populate the topics with some data 
-
-Execute the command `STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm/target/storm-kafka-client-1.0.x.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
-
-With the debug level logs enabled it is possible to see the messages of each 
topic being redirected to the appropriate Bolt as defined 
-by the streams defined and choice of shuffle grouping.
-
-## Using storm-kafka-client with different versions of kafka
-
-Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, 
meaning it will not be pulled in
-as a transitive dependency. This allows you to use a version of Kafka 
dependency compatible with your kafka cluster.
-
-When building a project with storm-kafka-client, you must explicitly add the 
Kafka clients dependency. For example, to
-use Kafka-clients 0.10.0.0, you would use the following dependency in your 
`pom.xml`:
-
-```xml
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>0.10.0.0</version>
-        </dependency>
-```
-
-You can also override the kafka clients version while building from maven, 
with parameter `storm.kafka.client.version`
-e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
-
-When selecting a kafka client version, you should ensure -
- 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or 
newer** kafka client API. For older versions,
- you can use storm-kafka module 
(https://github.com/apache/storm/tree/master/external/storm-kafka).
- 2. The kafka client selected by you should be wire compatible with the 
broker. e.g. 0.9.x client will not work with
- 0.8.x broker.
-
-
-#Kafka Spout Performance Tuning
-
-The Kafka spout provides two internal parameters to control its performance. 
The parameters can be set using the [KafkaSpoutConfig] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 methods [setOffsetCommitPeriodMs] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
 and [setMaxUncommittedOffsets] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).
 
-
-* "offset.commit.period.ms" controls how often the spout commits to Kafka
-* "max.uncommitted.offsets" controls how many offsets can be pending commit 
before another poll can take place
-<br/>
-
-The [Kafka consumer config] 
(http://kafka.apache.org/documentation.html#consumerconfigs) parameters may 
also have an impact on the performance of the spout. The following Kafka 
parameters are likely the most influential in the spout performance: 
-
-* “fetch.min.bytes”
-* “fetch.max.wait.ms”
-* [Kafka Consumer] 
(http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 instance poll timeout, which is specified for each Kafka spout using the 
[KafkaSpoutConfig] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 method [setPollTimeoutMs] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
-<br/>
-
-Depending on the structure of your Kafka cluster, distribution of the data, 
and availability of data to poll, these parameters will have to be configured 
appropriately. Please refer to the Kafka documentation on Kafka parameter 
tuning.
-
-###Default values
-
-Currently the Kafka spout has has the following default values, which have 
shown to give good performance in the test environment as described in this 
[blog post] 
(https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
-
-* poll.timeout.ms = 200
-* offset.commit.period.ms = 30000   (30s)
-* max.uncommitted.offsets = 10000000
-<br/>
-
-There will be a blog post coming soon analyzing the trade-offs of this tuning 
parameters, and comparing the performance of the Kafka Spouts using the Kafka 
client API introduced in 0.9 (new implementation) and in prior versions (prior 
implementation)
-
-#Future Work
- Implement comprehensive metrics. Trident spout is coming soon.
-
-## Committer Sponsors
- * Sriharsha Chintalapani ([srihar...@apache.org](mailto:srihar...@apache.org))
+Please see [here](../../docs/storm-kafka-client.md) for details on how to use 
it.

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
new file mode 100644
index 0000000..84d3334
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka.
+ * <p/>
+ * Most configuration for this bolt should be through the various 
+ * setter methods in the bolt.
+ * For backwards compatibility it supports the producer
+ * configuration and topic to be placed in the storm config under
+ * <p/>
+ * 'kafka.broker.properties' and 'topic'
+ * <p/>
+ * respectively.
+ */
+public class KafkaBolt<K, V> extends BaseRichBolt {
+    private static final long serialVersionUID = -5205886631877033478L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+
+    public static final String TOPIC = "topic";
+
+    private KafkaProducer<K, V> producer;
+    private OutputCollector collector;
+    private TupleToKafkaMapper<K,V> mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties boltSpecifiedProperties = new Properties();
+    /**
+     * {@see KafkaBolt#setFireAndForget(boolean)} for more details on this. 
+     */
+    private boolean fireAndForget = false;
+    /**
+     * {@see KafkaBolt#setAsync(boolean)} for more details on this. 
+     */
+    private boolean async = true;
+
+    public KafkaBolt() {}
+
+    public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> 
mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    /**
+     * Set the messages to be published to a single topic
+     * @param topic the topic to publish to
+     * @return this
+     */
+    public KafkaBolt<K, V> withTopicSelector(String topic) {
+        return withTopicSelector(new DefaultTopicSelector(topic));
+    }
+    
+    public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public KafkaBolt<K,V> withProducerProperties(Properties 
producerProperties) {
+        this.boltSpecifiedProperties = producerProperties;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+        LOG.info("Preparing bolt with configuration {}", this);
+        //for backward compatibility.
+        if (mapper == null) {
+            LOG.info("Mapper not specified. Setting default mapper to {}", 
FieldNameBasedTupleToKafkaMapper.class.getSimpleName());
+            this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
+        }
+
+        //for backward compatibility.
+        if (topicSelector == null) {
+            if (stormConf.containsKey(TOPIC)) {
+                LOG.info("TopicSelector not specified. Using [{}] for topic 
[{}] specified in bolt configuration,",
+                        DefaultTopicSelector.class.getSimpleName(), 
stormConf.get(TOPIC));
+                this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+            } else {
+                throw new IllegalStateException("topic should be specified in 
bolt's configuration");
+            }
+        }
+
+        producer = mkProducer(boltSpecifiedProperties);
+        this.collector = collector;
+    }
+    
+    /**
+     * Intended to be overridden for tests.  Make the producer with the given 
props
+     */
+    protected KafkaProducer<K, V> mkProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    @Override
+    public void execute(final Tuple input) {
+        if (TupleUtils.isTick(input)) {
+          collector.ack(input);
+          return; // Do not try to send ticks to Kafka
+        }
+        K key = null;
+        V message = null;
+        String topic = null;
+        try {
+            key = mapper.getKeyFromTuple(input);
+            message = mapper.getMessageFromTuple(input);
+            topic = topicSelector.getTopic(input);
+            if (topic != null ) {
+                Callback callback = null;
+
+                if (!fireAndForget && async) {
+                    callback = new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata ignored, 
Exception e) {
+                            synchronized (collector) {
+                                if (e != null) {
+                                    collector.reportError(e);
+                                    collector.fail(input);
+                                } else {
+                                    collector.ack(input);
+                                }
+                            }
+                        }
+                    };
+                }
+                Future<RecordMetadata> result = producer.send(new 
ProducerRecord<K, V>(topic, key, message), callback);
+                if (!async) {
+                    try {
+                        result.get();
+                        collector.ack(input);
+                    } catch (ExecutionException err) {
+                        collector.reportError(err);
+                        collector.fail(input);
+                    }
+                } else if (fireAndForget) {
+                    collector.ack(input);
+                }
+            } else {
+                LOG.warn("skipping key = " + key + ", topic selector returned 
null.");
+                collector.ack(input);
+            }
+        } catch (Exception ex) {
+            collector.reportError(ex);
+            collector.fail(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        producer.close();
+    }
+
+    /**
+     * If set to true the bolt will assume that sending a message to kafka 
will succeed and will ack
+     * the tuple as soon as it has handed the message off to the producer API
+     * if false (the default) the message will be acked after it was 
successfully sent to kafka or
+     * failed if it was not successfully sent.
+     * @param fireAndForget
+     */
+    public void setFireAndForget(boolean fireAndForget) {
+        this.fireAndForget = fireAndForget;
+    }
+
+    /**
+     * If set to true(the default) the bolt will not wait for the message
+     * to be fully sent to Kafka before getting another tuple to send.
+     * @param async true to have multiple tuples in flight to kafka, else 
false.
+     */
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+    
+    @Override
+    public String toString() {
+        return "KafkaBolt: {mapper: " + mapper +
+                " topicSelector: " + topicSelector +
+                " fireAndForget: " + fireAndForget +
+                " async: " + async +
+                " proerties: " + boltSpecifiedProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..f7638aa
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K,V> implements 
TupleToKafkaMapper<K, V> {
+    private static final long serialVersionUID = -8794262989021702349L;
+    public static final String BOLT_KEY = "key";
+    public static final String BOLT_MESSAGE = "message";
+    public String boltKeyField;
+    public String boltMessageField;
+
+    public FieldNameBasedTupleToKafkaMapper() {
+        this(BOLT_KEY, BOLT_MESSAGE);
+    }
+
+    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String 
boltMessageField) {
+        this.boltKeyField = boltKeyField;
+        this.boltMessageField = boltMessageField;
+    }
+
+    @Override
+    public K getKeyFromTuple(Tuple tuple) {
+        //for backward compatibility, we return null when key is not present.
+        return tuple.contains(boltKeyField) ? (K) 
tuple.getValueByField(boltKeyField) : null;
+    }
+
+    @Override
+    public V getMessageFromTuple(Tuple tuple) {
+        return (V) tuple.getValueByField(boltMessageField);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
new file mode 100644
index 0000000..9f11fc9
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+/**
+ * Interface defining a mapping from storm tuple to kafka key and message.
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public interface TupleToKafkaMapper<K,V> extends Serializable {
+    K getKeyFromTuple(Tuple tuple);
+    V getMessageFromTuple(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..3d00fc1
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = 4601118062437851265L;
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
new file mode 100644
index 0000000..ffe0b35a
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -3830575380208166367L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FieldIndexTopicSelector.class);
+
+    private final int fieldIndex;
+    private final String defaultTopicName;
+
+    public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
+        this.fieldIndex = fieldIndex;
+        if (fieldIndex < 0) {
+            throw new IllegalArgumentException("fieldIndex cannot be 
negative");
+        }
+        this.defaultTopicName = defaultTopicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        if (fieldIndex < tuple.size()) {
+            return tuple.getString(fieldIndex);
+        } else {
+            LOG.warn("Field index {} is out of bounds. Using default topic 
{}", fieldIndex, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
new file mode 100644
index 0000000..e90b26f
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -3903708904533396833L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+    private final String fieldName;
+    private final String defaultTopicName;
+
+
+    public FieldNameTopicSelector(String fieldName, String defaultTopicName) {
+        this.fieldName = fieldName;
+        this.defaultTopicName = defaultTopicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        if (tuple.contains(fieldName)) {
+            return tuple.getStringByField(fieldName);
+        } else {
+            LOG.warn("Field {} Not Found. Returning default topic {}", 
fieldName, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..cb7fb44
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
new file mode 100644
index 0000000..8ad527d
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Based off of a given Kafka topic a ConsumerRecord came from it will be 
translated to a Storm tuple
+ * and emitted to a given stream
+ * @param <K> the key of the incoming Records
+ * @param <V> the value of the incoming Records
+ */
+public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = -121699733778988688L;
+    private final RecordTranslator<K,V> defaultTranslator;
+    private final Map<String, RecordTranslator<K,V>> topicToTranslator = new 
HashMap<>();
+    private final Map<String, Fields> streamToFields = new HashMap<>();
+    
+    /**
+     * Create a simple record translator that will use func to extract the 
fields of the tuple,
+     * named by fields, and emit them to stream. This will handle all topics 
not explicitly set
+     * elsewhere.
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @param stream the stream to emit these fields on.
+     */
+    public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> 
func, Fields fields, String stream) {
+        this(new SimpleRecordTranslator<>(func, fields, stream));
+    }
+    
+    /**
+     * Create a simple record translator that will use func to extract the 
fields of the tuple,
+     * named by fields, and emit them to the default stream. This will handle 
all topics not explicitly set
+     * elsewhere.
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     */
+    public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> 
func, Fields fields) {
+        this(new SimpleRecordTranslator<>(func, fields));
+    }
+    
+    /**
+     * @param defaultTranslator a translator that will be used for all topics 
not explicitly set
+     * elsewhere.
+     */
+    public ByTopicRecordTranslator(RecordTranslator<K,V> defaultTranslator) {
+        this.defaultTranslator = defaultTranslator;
+        //This shouldn't throw on a Check, because nothing is configured yet
+        cacheNCheckFields(defaultTranslator);
+    }
+    
+    /**
+     * Configure a translator for a given topic with tuples to be emitted to 
the default stream.
+     * @param topic the topic this should be used for
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to 
another translator
+     * @throws IllegalArgumentException if the Fields for the stream this 
emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, 
Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+        return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+    }
+    
+    /**
+     * Configure a translator for a given topic.
+     * @param topic the topic this should be used for
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @param stream the stream to emit the tuples to.
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to 
another translator
+     * @throws IllegalArgumentException if the Fields for the stream this 
emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, 
Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+        return forTopic(topic, new SimpleRecordTranslator<>(func, fields, 
stream));
+    }
+    
+    /**
+     * Configure a translator for a given kafka topic
+     * @param topic the topic this translator should handle
+     * @param translator the translator itself
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to 
another translator
+     * @throws IllegalArgumentException if the Fields for the stream this 
emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, 
RecordTranslator<K,V> translator) {
+        if (topicToTranslator.containsKey(topic)) {
+            throw new IllegalStateException("Topic " + topic + " is already 
registered");
+        }
+        cacheNCheckFields(translator);
+        topicToTranslator.put(topic, translator);
+        return this;
+    }
+    
+    private void cacheNCheckFields(RecordTranslator<K, V> translator) {
+        for (String stream : translator.streams()) {
+            Fields fromTrans = translator.getFieldsFor(stream);
+            Fields cached = streamToFields.get(stream);
+            if (cached != null && !fromTrans.equals(cached)) {
+                throw new IllegalArgumentException("Stream " + stream + " 
currently has Fields of " + cached + " which is not the same as those being 
added in " + fromTrans);
+            }
+            
+            if (cached == null) {
+                streamToFields.put(stream, fromTrans);
+            }
+        }
+    }
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        RecordTranslator<K, V> trans = topicToTranslator.get(record.topic());
+        if (trans == null) {
+            trans = defaultTranslator;
+        }
+        return trans.apply(record);
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return streamToFields.get(stream);
+    }
+    
+    @Override
+    public List<String> streams() {
+        return new ArrayList<>(streamToFields.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
new file mode 100644
index 0000000..4b0262b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = -5782462870112305750L;
+    public static final Fields FIELDS = new Fields("topic", "partition", 
"offset", "key", "value");
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return new Values(record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                record.value());
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return FIELDS;
+    }
+
+    @Override
+    public List<String> streams() {
+        return DEFAULT_STREAM;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
new file mode 100644
index 0000000..a631d96
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+/**
+ * A simple interface to allow compatibility with non java 8
+ * code bases 
+ */
+public interface Func<V, R> {
+    R apply(V record);
+}

Reply via email to