http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
new file mode 100644
index 0000000..afe8f74
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used compute the partition and topic level offset metrics
+ * <p>
+ * Partition level metrics are:
+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of 
the partition
+ * topicName/partition_{number}/latestTimeOffset //gives end offset of the 
partition
+ * topicName/partition_{number}/latestEmittedOffset //gives latest emitted 
offset of the partition from the spout
+ * topicName/partition_{number}/latestCompletedOffset //gives latest committed 
offset of the partition from the spout
+ * topicName/partition_{number}/spoutLag // the delta between the latest 
Offset and latestCompletedOffset
+ * topicName/partition_{number}/recordsInPartition // total number of records 
in the partition
+ * </p>
+ * <p>
+ * Topic level metrics are:
+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all 
the associated partitions of this spout
+ * topicName/totalLatestTimeOffset //gives the total end offset of all the 
associated partitions of this spout
+ * topicName/totalLatestEmittedOffset //gives the total latest emitted offset 
of all the associated partitions of this spout
+ * topicName/totalLatestCompletedOffset //gives the total latest committed 
offset of all the associated partitions of this spout
+ * topicName/spoutLag // total spout lag of all the associated partitions of 
this spout
+ * topicName/totalRecordsInPartitions //total number of records in all the 
associated partitions of this spout
+ * </p>
+ */
+public class KafkaOffsetMetric implements IMetric {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+    private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+    private final Supplier<KafkaConsumer> consumerSupplier;
+
+    public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+        this.offsetManagerSupplier = offsetManagerSupplier;
+        this.consumerSupplier = consumerSupplier;
+    }
+
+    @Override
+    public Object getValueAndReset() {
+
+        Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
+        KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+        if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+            return null;
+        }
+
+        Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+        Map<TopicPartition, Long> beginningOffsets = 
kafkaConsumer.beginningOffsets(topicPartitions);
+        Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(topicPartitions);
+        //map to hold partition level and topic level metrics
+        Map<String, Long> result = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            OffsetManager offsetManager = entry.getValue();
+
+            long latestTimeOffset = endOffsets.get(topicPartition);
+            long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
+            long latestCompletedOffset = offsetManager.getCommittedOffset();
+            long spoutLag = latestTimeOffset - latestCompletedOffset;
+            long recordsInPartition =  latestTimeOffset - earliestTimeOffset;
+
+            String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
+            result.put(metricPath + "/" + "spoutLag", spoutLag);
+            result.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+            result.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+            result.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+            result.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+            result.put(metricPath + "/" + "recordsInPartition", 
recordsInPartition);
+
+            TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
+            if (topicMetrics == null) {
+                topicMetrics = new TopicMetrics();
+                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
+            }
+
+            topicMetrics.totalSpoutLag += spoutLag;
+            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+            topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
+            topicMetrics.totalRecordsInPartitions += recordsInPartition;
+        }
+
+        for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+            String topic = e.getKey();
+            TopicMetrics topicMetrics = e.getValue();
+            result.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
+            result.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
+            result.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
+            result.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
+            result.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
+            result.put(topic + "/" + "totalRecordsInPartitions", 
topicMetrics.totalRecordsInPartitions);
+        }
+
+        LOG.debug("Metrics Tick: value : {}", result);
+        return result;
+    }
+
+    private class TopicMetrics {
+        long totalSpoutLag = 0;
+        long totalEarliestTimeOffset = 0;
+        long totalLatestTimeOffset = 0;
+        long totalLatestEmittedOffset = 0;
+        long totalLatestCompletedOffset = 0;
+        long totalRecordsInPartitions = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
new file mode 100644
index 0000000..6fa81aa
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Wraps transaction batch information
+ */
+public class KafkaTridentSpoutBatchMetadata implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
+    private static final TopicPartitionSerializer TP_SERIALIZER = new 
TopicPartitionSerializer();
+    
+    public static final String TOPIC_PARTITION_KEY = "tp";
+    public static final String FIRST_OFFSET_KEY = "firstOffset";
+    public static final String LAST_OFFSET_KEY = "lastOffset";
+    
+    // topic partition of this batch
+    private final TopicPartition topicPartition;  
+    // first offset of this batch
+    private final long firstOffset;               
+    // last offset of this batch
+    private final long lastOffset;
+
+    /**
+     * Builds a metadata object.
+     * @param topicPartition The topic partition
+     * @param firstOffset The first offset for the batch
+     * @param lastOffset The last offset for the batch
+     */
+    public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long 
firstOffset, long lastOffset) {
+        this.topicPartition = topicPartition;
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+    }
+
+    /**
+     * Builds a metadata object from a non-empty set of records.
+     * @param topicPartition The topic partition the records belong to.
+     * @param consumerRecords The non-empty set of records.
+     */
+    public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition 
topicPartition, ConsumerRecords<K, V> consumerRecords) {
+        Validate.notNull(consumerRecords.records(topicPartition));
+        List<ConsumerRecord<K, V>> records = 
consumerRecords.records(topicPartition);
+        Validate.isTrue(!records.isEmpty(), "There must be at least one record 
in order to build metadata");
+        
+        this.topicPartition = topicPartition;
+        firstOffset = records.get(0).offset();
+        lastOffset = records.get(records.size() - 1).offset();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    public long getFirstOffset() {
+        return firstOffset;
+    }
+
+    public long getLastOffset() {
+        return lastOffset;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+    
+    /**
+     * Constructs a metadata object from a Map in the format produced by 
{@link #toMap() }.
+     * @param map The source map
+     * @return A new metadata object
+     */
+    public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> 
map) {
+        Map<String, Object> topicPartitionMap = (Map<String, 
Object>)map.get(TOPIC_PARTITION_KEY);
+        TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
+        return new KafkaTridentSpoutBatchMetadata(tp, 
((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
+            ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+    }
+    
+    /**
+     * Writes this metadata object to a Map so Trident can read/write it to 
Zookeeper.
+     */
+    public Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
+        map.put(FIRST_OFFSET_KEY, firstOffset);
+        map.put(LAST_OFFSET_KEY, lastOffset);
+        return map;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{topicPartition=" + topicPartition +
+                ", firstOffset=" + firstOffset +
+                ", lastOffset=" + lastOffset +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
new file mode 100644
index 0000000..3b4aa4b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTridentSpout.Emitter<
+        List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition,
+        Map<String, Object>>,
+        Serializable {
+
+    private static final long serialVersionUID = -7343927794834130435L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+
+    // Kafka
+    private final KafkaConsumer<K, V> kafkaConsumer;
+
+    // Bookkeeping
+    private final KafkaTridentSpoutManager<K, V> kafkaManager;
+    // set of topic-partitions for which first poll has already occurred, and 
the first polled txid
+    private final Map<TopicPartition, Long> firstPollTransaction = new 
HashMap<>(); 
+
+    // Declare some KafkaTridentSpoutManager references for convenience
+    private final long pollTimeoutMs;
+    private final KafkaSpoutConfig.FirstPollOffsetStrategy 
firstPollOffsetStrategy;
+    private final RecordTranslator<K, V> translator;
+    private final Timer refreshSubscriptionTimer;
+    private final TopicPartitionSerializer tpSerializer = new 
TopicPartitionSerializer();
+
+    private TopologyContext topologyContext;
+
+    /**
+     * Create a new Kafka spout emitter.
+     * @param kafkaManager The Kafka consumer manager to use
+     * @param topologyContext The topology context
+     * @param refreshSubscriptionTimer The timer for deciding when to recheck 
the subscription
+     */
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> 
kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
+        this.kafkaConsumer = 
kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
+        this.kafkaManager = kafkaManager;
+        this.topologyContext = topologyContext;
+        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
+        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
+
+        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = 
kafkaManager.getKafkaSpoutConfig();
+        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
+        this.firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    /**
+     * Creates instance of this class with default 500 millisecond refresh 
subscription timer
+     */
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> 
kafkaManager, TopologyContext topologyContext) {
+        this(kafkaManager, topologyContext, new Timer(500,
+                
kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), 
TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, 
TridentCollector collector,
+            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, 
Object> lastBatch) {
+
+        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = 
{}], [lastBatchMetadata = {}], [collector = {}]",
+                tx, currBatchPartition, lastBatch, collector);
+
+        final TopicPartition currBatchTp = 
currBatchPartition.getTopicPartition();
+        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
+        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? 
null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
+        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
+        Collection<TopicPartition> pausedTopicPartitions = 
Collections.emptySet();
+
+        if (assignments == null || 
!assignments.contains(currBatchPartition.getTopicPartition())) {
+            LOG.warn("SKIPPING processing batch [transaction = {}], 
[currBatchPartition = {}], [lastBatchMetadata = {}], " +
+                            "[collector = {}] because it is not part of the 
assignments {} of consumer instance [{}] " +
+                            "of consumer group [{}]", tx, currBatchPartition, 
lastBatch, collector, assignments,
+                    kafkaConsumer, 
kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+        } else {
+            try {
+                // pause other topic-partitions to only poll from current 
topic-partition
+                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
+
+                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+
+                // poll
+                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                    
kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+                }
+
+                final ConsumerRecords<K, V> records = 
kafkaConsumer.poll(pollTimeoutMs);
+                LOG.debug("Polled [{}] records from Kafka.", records.count());
+
+                if (!records.isEmpty()) {
+                    emitTuples(collector, records);
+                    // build new metadata
+                    currentBatch = new 
KafkaTridentSpoutBatchMetadata(currBatchTp, records);
+                }
+            } finally {
+                kafkaConsumer.resume(pausedTopicPartitions);
+                LOG.trace("Resumed topic-partitions {}", 
pausedTopicPartitions);
+            }
+            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition 
= {}], [lastBatchMetadata = {}], " +
+                    "[currBatchMetadata = {}], [collector = {}]", tx, 
currBatchPartition, lastBatch, currentBatch, collector);
+        }
+
+        return currentBatch == null ? null : currentBatch.toMap();
+    }
+
+    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> 
records) {
+        for (ConsumerRecord<K, V> record : records) {
+            final List<Object> tuple = translator.apply(record);
+            collector.emit(tuple);
+            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
+        }
+    }
+
+    /**
+     * Determines the offset of the next fetch. Will use the 
firstPollOffsetStrategy if this is the first poll for the topic partition.
+     * Otherwise the next offset will be one past the last batch, based on 
lastBatchMeta.
+     * 
+     * <p>lastBatchMeta should only be null when the previous txid was not 
emitted (e.g. new topic),
+     * it is the first poll for the spout instance, or it is a replay of the 
first txid this spout emitted on this partition.
+     * In the second case, there are either no previous transactions, or the 
MBC is still committing them
+     * and they will fail because this spout did not emit the corresponding 
batches. If it had emitted them, the meta could not be null. 
+     * In any case, the lastBatchMeta should never be null if this is not the 
first poll for this spout instance.
+     *
+     * @return the offset of the next fetch
+     */
+    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta, long transactionId) {
+        if (isFirstPoll(tp, transactionId)) {
+            if (firstPollOffsetStrategy == EARLIEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == LATEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to 
partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
+            } else if (lastBatchMeta != null) {
+                LOG.debug("First poll for topic partition [{}], using last 
batch metadata", tp);
+                kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+                LOG.debug("First poll for topic partition [{}] with no last 
batch metadata, seeking to partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+                LOG.debug("First poll for topic partition [{}] with no last 
batch metadata, seeking to partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
+            }
+            firstPollTransaction.put(tp, transactionId);
+        } else {
+            kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
+            LOG.debug("First poll for topic partition [{}], using last batch 
metadata", tp);
+        }
+
+        final long fetchOffset = kafkaConsumer.position(tp);
+        LOG.debug("Set [fetchOffset = {}]", fetchOffset);
+        return fetchOffset;
+    }
+
+    private boolean isFirstPoll(TopicPartition tp, long txid) {
+        // The first poll is either the "real" first transaction, or a replay 
of the first transaction
+        return !firstPollTransaction.containsKey(tp) || 
firstPollTransaction.get(tp) == txid;
+    }
+
+    // returns paused topic-partitions.
+    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition 
excludedTp) {
+        final Set<TopicPartition> pausedTopicPartitions = new 
HashSet<>(kafkaConsumer.assignment());
+        LOG.debug("Currently assigned topic-partitions {}", 
pausedTopicPartitions);
+        pausedTopicPartitions.remove(excludedTp);
+        kafkaConsumer.pause(pausedTopicPartitions);
+        LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
+        return pausedTopicPartitions;
+    }
+
+    @Override
+    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> 
partitionResponsibilities) {
+        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
+                "No action taken by this method for topic partitions {}", 
partitionResponsibilities);
+    }
+
+    /**
+     * Computes ordered list of topic-partitions for this task taking into 
consideration that topic-partitions
+     * for this task must be assigned to the Kafka consumer running on this 
task.
+     *
+     * @param allPartitionInfo list of all partitions as returned by {@link 
KafkaTridentSpoutOpaqueCoordinator}
+     * @return ordered list of topic partitions for this task
+     */
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final 
List<Map<String, Object>> allPartitionInfo) {
+        List<TopicPartition> allTopicPartitions = new ArrayList<>();
+        for(Map<String, Object> map : allPartitionInfo) {
+            allTopicPartitions.add(tpSerializer.fromMap(map));
+        }
+        final List<KafkaTridentSpoutTopicPartition> allPartitions = 
newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
+        LOG.debug("Returning all topic-partitions {} across all tasks. Current 
task index [{}]. Total tasks [{}] ",
+                allPartitions, topologyContext.getThisTaskIndex(), 
getNumTasks());
+        return allPartitions;
+    }
+
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int 
taskId, int numTasks,
+        List<Map<String, Object>> allPartitionInfo) {
+        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
+        LOG.debug("Consumer [{}], running on task with index [{}], has 
assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
+        final List<KafkaTridentSpoutTopicPartition> taskTps = 
newKafkaTridentSpoutTopicPartitions(assignedTps);
+        LOG.debug("Returning topic-partitions {} for task with index [{}]", 
taskTps, taskId);
+        return taskTps;
+    }
+
+    private List<KafkaTridentSpoutTopicPartition> 
newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
+        final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps 
== null ? 0 : tps.size());
+        if (tps != null) {
+            for (TopicPartition tp : tps) {
+                LOG.trace("Added topic-partition [{}]", tp);
+                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
+            }
+        }
+        return kttp;
+    }
+
+    private int getNumTasks() {
+        return 
topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
+    }
+
+    @Override
+    public void close() {
+        kafkaConsumer.close();
+        LOG.debug("Closed");
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
new file mode 100644
index 0000000..26db5c9
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+public class KafkaTridentSpoutManager<K, V> implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
+
+    // Kafka
+    private transient KafkaConsumer<K, V> kafkaConsumer;
+
+    // Bookkeeping
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    // Declare some KafkaSpoutConfig references for convenience
+    private Fields fields;
+
+    /**
+     * Create a KafkaConsumer manager for the trident spout.
+     * @param kafkaSpoutConfig The consumer config
+     */
+    public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
+        this.fields = getFields();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext 
context) {
+        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+
+        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new 
KafkaSpoutConsumerRebalanceListener(), context);
+        return kafkaConsumer;
+    }
+
+    KafkaConsumer<K, V> getKafkaConsumer() {
+        return kafkaConsumer;
+    }
+
+    Set<TopicPartition> getTopicPartitions() {
+        return 
KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
+    }
+
+    final Fields getFields() {
+        if (fields == null) {
+            RecordTranslator<K, V> translator = 
kafkaSpoutConfig.getTranslator();
+            Fields fs = null;
+            for (String stream : translator.streams()) {
+                if (fs == null) {
+                    fs = translator.getFieldsFor(stream);
+                } else {
+                    if (!fs.equals(translator.getFieldsFor(stream))) {
+                        throw new IllegalArgumentException("Trident Spouts do 
not support multiple output Fields");
+                    }
+                }
+            }
+            fields = fs;
+        }
+        LOG.debug("OutputFields = {}", fields);
+        return fields;
+    }
+
+    KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
+        return kafkaSpoutConfig;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaConsumer=" + kafkaConsumer +
+                ", kafkaSpoutConfig=" + kafkaSpoutConfig +
+                '}';
+    }
+
+    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+            
KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            
KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
+            LOG.info("Partitions reassignment. [consumer-group={}, 
consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
new file mode 100644
index 0000000..ecc9219
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaque<K,V> implements 
IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
+    private static final long serialVersionUID = -8003272486566259640L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
+
+    private final KafkaTridentSpoutManager<K, V> kafkaManager;
+
+    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
+        this(new KafkaTridentSpoutManager<>(conf));
+    }
+    
+    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> 
kafkaManager) {
+        this.kafkaManager = kafkaManager;
+        LOG.debug("Created {}", this.toString());
+    }
+
+    @Override
+    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, 
Map<String, Object>> getEmitter(
+            Map conf, TopologyContext context) {
+        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
+    }
+
+    @Override
+    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, 
TopologyContext context) {
+        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        final Fields outputFields = kafkaManager.getFields();
+        LOG.debug("OutputFields = {}", outputFields);
+        return outputFields;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
new file mode 100644
index 0000000..449e24b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements 
IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
+        Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
+
+    private final TopicPartitionSerializer tpSerializer = new 
TopicPartitionSerializer();
+    private final KafkaTridentSpoutManager<K,V> kafkaManager;
+
+    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> 
kafkaManager) {
+        this.kafkaManager = kafkaManager;
+        LOG.debug("Created {}", this.toString());
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        LOG.debug("isReady = true");
+        return true;    // the "old" trident kafka spout always returns true, 
like this
+    }
+
+    @Override
+    public List<Map<String, Object>> getPartitionsForBatch() {
+        final ArrayList<TopicPartition> topicPartitions = new 
ArrayList<>(kafkaManager.getTopicPartitions());
+        LOG.debug("TopicPartitions for batch {}", topicPartitions);
+        List<Map<String, Object>> tps = new ArrayList<>();
+        for(TopicPartition tp : topicPartitions) {
+            tps.add(tpSerializer.toMap(tp));
+        }
+        return tps;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("Closed"); // the "old" trident kafka spout is no op like 
this
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
new file mode 100644
index 0000000..b020bea
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+import java.io.Serializable;
+
+/**
+ * {@link ISpoutPartition} that wraps {@link TopicPartition} information
+ */
+public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, 
Serializable {
+    private TopicPartition topicPartition;
+
+    public KafkaTridentSpoutTopicPartition(String topic, int partition) {
+        this(new TopicPartition(topic, partition));
+    }
+
+    public KafkaTridentSpoutTopicPartition(TopicPartition topicPartition) {
+        this.topicPartition = topicPartition;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+
+    @Override
+    public String getId() {
+        return topicPartition.topic() + "@" + topicPartition.partition();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KafkaTridentSpoutTopicPartition that = 
(KafkaTridentSpoutTopicPartition) o;
+
+        return topicPartition != null ? 
topicPartition.equals(that.topicPartition) : that.topicPartition == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return topicPartition != null ? topicPartition.hashCode() : 0;
+    }
+
+    @Override
+    public String toString()  {
+        return topicPartition.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
new file mode 100644
index 0000000..2d50ca7
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public enum KafkaTridentSpoutTopicPartitionRegistry {
+    INSTANCE;
+
+    private Set<TopicPartition> topicPartitions;
+
+    KafkaTridentSpoutTopicPartitionRegistry() {
+        this.topicPartitions = new LinkedHashSet<>();
+    }
+
+    public Set<TopicPartition> getTopicPartitions() {
+        return Collections.unmodifiableSet(topicPartitions);
+    }
+
+    public void addAll(Collection<? extends TopicPartition> topicPartitions) {
+        this.topicPartitions.addAll(topicPartitions);
+    }
+
+    public void removeAll(Collection<? extends TopicPartition> 
topicPartitions) {
+        this.topicPartitions.removeAll(topicPartitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
new file mode 100644
index 0000000..6e1c587
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Map;
+
+// TODO
+public class KafkaTridentSpoutTransactional implements 
IPartitionedTridentSpout {
+    @Override
+    public Coordinator getCoordinator(Map conf, TopologyContext context) {
+        return null;
+    }
+
+    @Override
+    public Emitter getEmitter(Map conf, TopologyContext context) {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
new file mode 100644
index 0000000..50e78f0
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+
+public class TopicPartitionSerializer {
+
+    public static final String TOPIC_PARTITION_TOPIC_KEY = "topic";
+    public static final String TOPIC_PARTITION_PARTITION_KEY = "partition";
+
+    /**
+     * Serializes the given TopicPartition to Map so Trident can serialize it 
to JSON.
+     */
+    public Map<String, Object> toMap(TopicPartition topicPartition) {
+        Map<String, Object> topicPartitionMap = new HashMap<>();
+        topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, 
topicPartition.topic());
+        topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, 
topicPartition.partition());
+        return topicPartitionMap;
+    }
+
+    /**
+     * Deserializes the given map into a TopicPartition. The map keys are 
expected to be those produced by
+     * {@link #toMap(org.apache.kafka.common.TopicPartition)}.
+     */
+    public TopicPartition fromMap(Map<String, Object> map) {
+        return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY),
+            ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..2e2c13b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState<K, V> implements State {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+    private KafkaProducer<K, V> producer;
+
+    private TridentTupleToKafkaMapper<K, V> mapper;
+    private KafkaTopicSelector topicSelector;
+
+    public TridentKafkaState<K, V> 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaState<K, V> withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is Noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is Noop.");
+    }
+
+    /**
+     * Prepare this State.
+     * @param options The KafkaProducer config.
+     */
+    public void prepare(Properties options) {
+        Objects.requireNonNull(mapper, "mapper can not be null");
+        Objects.requireNonNull(topicSelector, "topicSelector can not be null");
+        producer = new KafkaProducer<>(options);
+    }
+
+    /**
+     * Write the given tuples to Kafka.
+     * @param tuples The tuples to write.
+     * @param collector The Trident collector.
+     */
+    public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
+        String topic = null;
+        try {
+            long startTime = System.currentTimeMillis();
+            int numberOfRecords = tuples.size();
+            List<Future<RecordMetadata>> futures = new 
ArrayList<>(numberOfRecords);
+            for (TridentTuple tuple : tuples) {
+                topic = topicSelector.getTopic(tuple);
+                V messageFromTuple = mapper.getMessageFromTuple(tuple);
+                K keyFromTuple = mapper.getKeyFromTuple(tuple);
+
+                if (topic != null) {
+                    if (messageFromTuple != null) {
+                        Future<RecordMetadata> result = producer.send(new 
ProducerRecord<>(topic, keyFromTuple, messageFromTuple));
+                        futures.add(result);
+                    } else {
+                        LOG.warn("skipping Message with Key {} as message was 
null", keyFromTuple);
+                    }
+
+                } else {
+                    LOG.warn("skipping key = {}, topic selector returned 
null.", keyFromTuple);
+                }
+            }
+
+            int emittedRecords = futures.size();
+            List<ExecutionException> exceptions = new 
ArrayList<>(emittedRecords);
+            for (Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (ExecutionException e) {
+                    exceptions.add(e);
+                }
+            }
+
+            if (exceptions.size() > 0) {
+                StringBuilder errorMsg = new StringBuilder("Could not retrieve 
result for messages ");
+                errorMsg.append(tuples).append(" from topic = ").append(topic)
+                        .append(" because of the following 
exceptions:").append(System.lineSeparator());
+
+                for (ExecutionException exception : exceptions) {
+                    errorMsg = 
errorMsg.append(exception.getMessage()).append(System.lineSeparator());
+                }
+                String message = errorMsg.toString();
+                LOG.error(message);
+                throw new FailedException(message);
+            }
+            long latestTime = System.currentTimeMillis();
+            LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", 
emittedRecords, latestTime - startTime, topic);
+
+        } catch (Exception ex) {
+            String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
+            LOG.warn(errorMsg, ex);
+            throw new FailedException(errorMsg, ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..35620de
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory<K, V> implements StateFactory {
+
+    private static final long serialVersionUID = -3613240970062343385L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+    private TridentTupleToKafkaMapper<K, V> mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties producerProperties = new Properties();
+
+    public TridentKafkaStateFactory<K, V> 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaStateFactory<K, V> 
withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public TridentKafkaStateFactory<K, V> withProducerProperties(Properties 
props) {
+        this.producerProperties = props;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", 
partitionIndex, numPartitions);
+        TridentKafkaState<K, V> state = new TridentKafkaState<>();
+        state.withKafkaTopicSelector(this.topicSelector)
+            .withTridentTupleToKafkaMapper(this.mapper);
+        state.prepare(producerProperties);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
new file mode 100644
index 0000000..19e3d33
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import java.util.List;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class TridentKafkaStateUpdater<K, V> extends 
BaseStateUpdater<TridentKafkaState<K, V>> {
+
+    private static final long serialVersionUID = 3352659585225274402L;
+
+    @Override
+    public void updateState(TridentKafkaState<K, V> state, List<TridentTuple> 
tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements 
TridentTupleToKafkaMapper {
+
+    public final String keyFieldName;
+    public final String msgFieldName;
+
+    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+        this.keyFieldName = keyFieldName;
+        this.msgFieldName = msgFieldName;
+    }
+
+    @Override
+    public K getKeyFromTuple(TridentTuple tuple) {
+        return (K) tuple.getValueByField(keyFieldName);
+    }
+
+    @Override
+    public V getMessageFromTuple(TridentTuple tuple) {
+        return (V) tuple.getValueByField(msgFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
+    K getKeyFromTuple(TridentTuple tuple);
+    V getMessageFromTuple(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..607c996
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -1172454882072591493L;
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
new file mode 100644
index 0000000..93b1040
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+public class KafkaUnit {
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkUtils zkUtils;
+    private KafkaProducer<String, String> producer;
+    private static final String ZK_HOST = "127.0.0.1";
+    private static final String KAFKA_HOST = "127.0.0.1";
+    private static final int KAFKA_PORT = 9092;
+
+    public KafkaUnit() {
+    }
+
+    public void setUp() throws IOException {
+        // setup ZK
+        zkServer = new EmbeddedZookeeper();
+        String zkConnect = ZK_HOST + ":" + zkServer.port();
+        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", 
String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        MockTime mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // setup default Producer
+        createProducer();
+    }
+
+    public void tearDown() {
+        closeProducer();
+        kafkaServer.shutdown();
+        zkUtils.close();
+        zkServer.shutdown();
+    }
+
+    public void createTopic(String topicName) {
+        AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+    }
+
+    public int getKafkaPort() {
+        return KAFKA_PORT;
+    }
+
+    private void createProducer() {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + 
KAFKA_PORT);
+        producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        producer = new KafkaProducer<>(producerProps);
+    }
+
+    public void createProducer(Serializer keySerializer, Serializer 
valueSerializer) {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + 
KAFKA_PORT);
+        producer = new KafkaProducer<>(producerProps, keySerializer, 
valueSerializer);
+    }
+
+    public void sendMessage(ProducerRecord producerRecord) throws 
InterruptedException, ExecutionException, TimeoutException {
+        producer.send(producerRecord).get(10, TimeUnit.SECONDS);
+    }
+
+    private void closeProducer() {
+        producer.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
new file mode 100644
index 0000000..6e90c9d
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.rules.ExternalResource;
+
+import java.io.IOException;
+
+
+public class KafkaUnitRule extends ExternalResource {
+
+    private final KafkaUnit kafkaUnit;
+
+    public KafkaUnitRule() {
+        this.kafkaUnit = new KafkaUnit();
+    }
+
+    @Override
+    public void before() throws IOException {
+        kafkaUnit.setUp();
+    }
+
+    @Override
+    public void after() {
+        kafkaUnit.tearDown();
+    }
+
+    public KafkaUnit getKafkaUnit() {
+        return this.kafkaUnit;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..8c8a945
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
+    
+    @SuppressWarnings({ "unchecked", "serial" })
+    @Test
+    public void testSimple() {
+        final KafkaProducer<String, String> producer = 
mock(KafkaProducer.class);
+        when(producer.send((ProducerRecord<String,String>)any(), 
(Callback)any())).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                Callback c = (Callback)invocation.getArguments()[1];
+                c.onCompletion(null, null);
+                return null;
+            }
+        });
+        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+            @Override
+            protected KafkaProducer<String, String> mkProducer(Properties 
props) {
+                return producer;
+            }
+        };
+        bolt.withTopicSelector("MY_TOPIC");
+        
+        OutputCollector collector = mock(OutputCollector.class);
+        TopologyContext context = mock(TopologyContext.class);
+        Map<String, Object> conf = new HashMap<>();
+        bolt.prepare(conf, context, collector);
+        MkTupleParam param = new MkTupleParam();
+        param.setFields("key", "message");
+        Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+        bolt.execute(testTuple);
+        verify(producer).send(argThat(new 
ArgumentMatcher<ProducerRecord<String, String>>() {
+            @Override
+            public boolean matches(Object argument) {
+                LOG.info("GOT {} ->", argument);
+                ProducerRecord<String, String> arg = (ProducerRecord<String, 
String>) argument;
+                LOG.info("  {} {} {}", arg.topic(), arg.key(), arg.value());
+                return "MY_TOPIC".equals(arg.topic()) &&
+                        "KEY".equals(arg.key()) &&
+                        "VALUE".equals(arg.value());
+            }
+        }), any(Callback.class));
+        verify(collector).ack(testTuple);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
new file mode 100644
index 0000000..abc58f0
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+    public static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> 
KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key(), record.value());
+        }
+    };
+    
+    @Test
+    public void testBasic() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new 
Fields("key"));
+        trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), 
"value-stream");
+        trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), 
"key-value-stream");
+        HashSet<String> expectedStreams = new HashSet<>();
+        expectedStreams.add("default");
+        expectedStreams.add("value-stream");
+        expectedStreams.add("key-value-stream");
+        assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+        ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC 
OTHER", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+        assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+        
+        ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 
100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("value"), trans.getFieldsFor("value-stream"));
+        assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+        
+        ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 
100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key", "value"), 
trans.getFieldsFor("key-value-stream"));
+        assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testFieldCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new 
Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"));
+    }
+    
+    @Test(expected = IllegalStateException.class)
+    public void testTopicCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new 
Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1");
+        trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), 
"foo2");
+    }
+
+}

Reply via email to