Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch 9c8930036 -> 4348548aa


http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 95b2199..b178687 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,17 +17,24 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.Subscription;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
 
 public class SingleTopicKafkaSpoutConfiguration {
 
@@ -42,53 +49,44 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     public static StormTopology getTopologyKafkaSpout(int port) {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+        tp.setSpout("kafka_spout", new 
KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()),
 1);
         tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
 
-    public static KafkaSpoutConfig<String, String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
-        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+    private static Func<ConsumerRecord<String, String>, List<Object>> 
TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() 
{
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.key(), r.value());
+        }
+    };
+
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
     }
 
-    public static KafkaSpoutConfig<String, String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long 
offsetCommitPeriodMs) {
-        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 
offsetCommitPeriodMs, getRetryService());
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, 
String>("127.0.0.1:" + port, subscription));
     }
 
-    public static KafkaSpoutConfig<String, String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long 
offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
-        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), 
kafkaSpoutStreams, getTuplesBuilder(), retryService)
-            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+    public static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config
+            .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+                new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getNoDelayRetryService())
+            .setOffsetCommitPeriodMs(10_000)
             .setFirstPollOffsetStrategy(EARLIEST)
             .setMaxUncommittedOffsets(250)
-            .setPollTimeoutMs(1000)
-            .build();
-    }
-
-    protected static KafkaSpoutRetryService getRetryService() {
-        return new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
-            KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), 
Integer.MAX_VALUE, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    }
-
-    protected static Map<String, Object> getKafkaConsumerProps(int port) {
-        Map<String, Object> props = new HashMap<>();
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + 
port);
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("max.poll.records", "5");
-        return props;
-    }
-
-    protected static KafkaSpoutTuplesBuilder<String, String> 
getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-            new TopicKeyValueTupleBuilder<String, String>(TOPIC))
-            .build();
+            .setPollTimeoutMs(1000);
     }
 
-    public static KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "key", "value");
-        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, 
new String[]{TOPIC}) // contents of topics test sent to test_stream
-            .build();
+    protected static KafkaSpoutRetryService getNoDelayRetryService() {
+        /**
+         * Retry in a tight loop (keep unit tests fasts).
+         */
+        return new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
deleted file mode 100644
index 4f20b58..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.builders;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicKeyValueTupleBuilder<K, V> extends 
KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build 
tuples
-     */
-    public TopicKeyValueTupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.key(),
-                consumerRecord.value());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
new file mode 100644
index 0000000..9972d4c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.NoSuchElementException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class OffsetManagerTest {
+    private static final String COMMIT_METADATA = 
"{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}";
+
+    @Rule
+    public ExpectedException expect = ExpectedException.none();
+    
+    private final long initialFetchOffset = 0;
+    private final TopicPartition testTp = new TopicPartition("testTopic", 0);
+    private final OffsetManager manager = new OffsetManager(testTp, 
initialFetchOffset);
+
+    @Test
+    public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() {
+        /* If topic compaction is enabled in Kafka, we sometimes need to 
commit past a gap of deleted offsets
+         * Since the Kafka consumer should return offsets in order, we can 
assume that if a message is acked
+         * then any prior message will have been emitted at least once.
+         * If we see an acked message and some of the offsets preceding it 
were not emitted, they must have been compacted away and should be skipped.
+         */
+        manager.addToEmitMsgs(0);
+        manager.addToEmitMsgs(1);
+        manager.addToEmitMsgs(2);
+        //3, 4 compacted away
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 6);
+        manager.addToAckMsgs(getMessageId(initialFetchOffset));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 1));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 2));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
+        
+        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), 
is(initialFetchOffset + 3));
+        
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
+        
+        assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted",
+            manager.findNextCommitOffset(COMMIT_METADATA), is(new 
OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
+    }
+    
+    @Test
+    public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() {
+        //0-4 compacted away
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 6);
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
+        
+        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset(COMMIT_METADATA), is(nullValue()));
+        
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
+        
+        assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
+            manager.findNextCommitOffset(COMMIT_METADATA), is(new 
OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
+    }
+
+    @Test
+    public void testFindNextCommittedOffsetWithNoAcks() {
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("There shouldn't be a next commit offset when nothing has 
been acked", nextCommitOffset, is(nullValue()));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithOneAck() {
+        /*
+         * The KafkaConsumer commitSync API docs: "The committed offset should 
be the next message your application will consume, i.e.
+         * lastProcessedMessageOffset + 1. "
+         */
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("The next commit offset should be one past the processed 
message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
+        emitAndAckMessage(getMessageId(initialFetchOffset + 1));
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("The next commit offset should be one past the processed 
message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithAckedOffsetGap() {
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("The next commit offset should cover the sequential acked 
offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+
+    @Test
+    public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() {
+        /**
+         * If topic compaction is enabled in Kafka some offsets may be deleted.
+         * We distinguish this case from regular gaps in the acked offset 
sequence caused by out of order acking
+         * by checking that offsets in the gap have been emitted at some point 
previously. 
+         * If they haven't then they can't exist in Kafka, since the spout 
emits tuples in order.
+         */
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("The next commit offset should cover all the acked offsets, 
since the offset in the gap hasn't been emitted and doesn't exist",
+            nextCommitOffset.offset(), is(initialFetchOffset + 3));
+    }
+    
+    @Test
+    public void testFindNextCommitOffsetWithUnackedOffsetGap() {
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = 
manager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("The next commit offset should cover the contiguously acked 
offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+    
+    @Test
+    public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
+        OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10);
+        emitAndAckMessage(getMessageId(0));
+        OffsetAndMetadata nextCommitOffset = 
startAtHighOffsetManager.findNextCommitOffset(COMMIT_METADATA);
+        assertThat("Acking an offset earlier than the committed offset should 
have no effect", nextCommitOffset, is(nullValue()));
+    }
+    
+    @Test
+    public void testCommit() {
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        emitAndAckMessage(getMessageId(initialFetchOffset + 1));
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        
+        long committedMessages = manager.commit(new 
OffsetAndMetadata(initialFetchOffset + 2));
+        
+        assertThat("Should have committed all messages to the left of the 
earliest uncommitted offset", committedMessages, is(2L));
+        assertThat("The committed messages should not be in the acked list 
anymore", manager.contains(getMessageId(initialFetchOffset)), is(false));
+        assertThat("The committed messages should not be in the emitted list 
anymore", manager.containsEmitted(initialFetchOffset), is(false));
+        assertThat("The committed messages should not be in the acked list 
anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false));
+        assertThat("The committed messages should not be in the emitted list 
anymore", manager.containsEmitted(initialFetchOffset + 1), is(false));
+        assertThat("The uncommitted message should still be in the acked 
list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true));
+        assertThat("The uncommitted message should still be in the emitted 
list", manager.containsEmitted(initialFetchOffset + 2), is(true));
+    }
+
+    private KafkaSpoutMessageId getMessageId(long offset) {
+        return new KafkaSpoutMessageId(testTp, offset);
+    }
+    
+    private void emitAndAckMessage(KafkaSpoutMessageId msgId) {
+        manager.addToEmitMsgs(msgId.offset());
+        manager.addToAckMsgs(msgId);
+    }
+
+    @Test
+    public void testGetNthUncommittedOffsetAfterCommittedOffset() { 
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        manager.addToEmitMsgs(initialFetchOffset + 2);
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 30);
+        
+        assertThat("The third uncommitted offset should be 5", 
manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 
5L));
+        assertThat("The fourth uncommitted offset should be 30", 
manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 
30L));
+        
+        expect.expect(NoSuchElementException.class);
+        manager.getNthUncommittedOffsetAfterCommittedOffset(5);
+    }
+
+    @Test
+    public void testCommittedFlagSetOnCommit() throws Exception {
+        assertFalse(manager.hasCommitted());
+        manager.commit(mock(OffsetAndMetadata.class));
+        assertTrue(manager.hasCommitted());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
new file mode 100644
index 0000000..9a2a682
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.ManualPartitioner;
+import org.apache.storm.kafka.spout.TopicFilter;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class ManualPartitionSubscriptionTest {
+
+    @Test
+    public void testCanReassignPartitions() {
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        TopicFilter filterMock = mock(TopicFilter.class);
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        ConsumerRebalanceListener listenerMock = 
mock(ConsumerRebalanceListener.class);
+        TopologyContext contextMock = mock(TopologyContext.class);
+        ManualPartitionSubscription subscription = new 
ManualPartitionSubscription(partitionerMock, filterMock);
+        
+        List<TopicPartition> onePartition = Collections.singletonList(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        List<TopicPartition> twoPartitions = new ArrayList<>();
+        twoPartitions.add(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        twoPartitions.add(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+        when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
+            .thenReturn(onePartition)
+            .thenReturn(twoPartitions);
+        
+        //Set the first assignment
+        subscription.subscribe(consumerMock, listenerMock, contextMock);
+        
+        InOrder inOrder = inOrder(consumerMock, listenerMock);
+        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new 
HashSet<>(onePartition));
+        
+        reset(consumerMock, listenerMock);
+        
+        when(consumerMock.assignment()).thenReturn(new 
HashSet<>(onePartition));
+        
+        //Update to set the second assignment
+        subscription.refreshAssignment();
+        
+        //The partition revocation hook must be called before the new 
partitions are assigned to the consumer,
+        //to allow the revocation hook to commit offsets for the revoked 
partitions.
+        inOrder.verify(listenerMock).onPartitionsRevoked(new 
HashSet<>(onePartition));
+        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new 
HashSet<>(twoPartitions));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 5a78137..40a2d3c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -18,45 +18,43 @@
 
 package org.apache.storm.kafka.spout.test;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
 
 public class KafkaSpoutTopologyMainNamedTopics {
-    private static final String[] STREAMS = new 
String[]{"test_stream","test1_stream","test2_stream"};
+    private static final String TOPIC_2_STREAM = "test_2_stream";
+    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
     private static final String[] TOPICS = new 
String[]{"test","test1","test2"};
 
-
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainNamedTopics().runMain(args);
     }
 
     protected void runMain(String[] args) throws Exception {
         if (args.length == 0) {
-            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+            submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
         } else {
-            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), 
getConfig());
+            submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), 
getConfig());
         }
 
     }
@@ -87,16 +85,34 @@ public class KafkaSpoutTopologyMainNamedTopics {
         return config;
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
+          .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)
+          .shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
+        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
         return tp.createTopology();
     }
 
-    protected KafkaSpoutConfig<String,String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<String, 
String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), 
getRetryService())
+    public static Func<ConsumerRecord<String, String>, List<Object>> 
TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, 
List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.partition(), r.offset(), r.key(), 
r.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+        ByTopicRecordTranslator<String, String> trans = new 
ByTopicRecordTranslator<>(
+                TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_0_1_STREAM);
+        trans.forTopic(TOPICS[2], 
+                TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_2_STREAM);
+        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS)
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+                .setRetry(getRetryService())
+                .setRecordTranslator(trans)
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
                 .setMaxUncommittedOffsets(250)
@@ -107,30 +123,4 @@ public class KafkaSpoutTopologyMainNamedTopics {
             return new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                     TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
     }
-
-    protected Map<String,Object> getKafkaConsumerProps() {
-        Map<String, Object> props = new HashMap<>();
-//        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
"127.0.0.1:9092");
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        return props;
-    }
-
-    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
-                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-                .build();
-    }
-
-    protected KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
-        final Fields outputFields1 = new Fields("topic", "partition", 
"offset");
-        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, 
STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, 
test1, sent to test_stream
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  
// contents of topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) 
 // contents of topic test2 sent to test2_stream
-                .build();
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index c362a2b..f0004ea 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -18,45 +18,51 @@
 
 package org.apache.storm.kafka.spout.test;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutStream;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.util.regex.Pattern;
+import org.apache.storm.tuple.Values;
 
 public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMainNamedTopics {
     private static final String STREAM = "test_wildcard_stream";
-    private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+    private static final Pattern TOPIC_WILDCARD_PATTERN = 
Pattern.compile("test[1|2]");
 
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
 
-    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder());
-    }
-
-    protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() {
-        return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
-    }
-
-    protected KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
-        final KafkaSpoutStream kafkaSpoutStream = new 
KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN));
-        return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream);
+    public static Func<ConsumerRecord<String, String>, List<Object>> 
TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, 
List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.partition(), r.offset(), r.key(), 
r.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+        return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+                .setRetry(getRetryService())
+                .setRecordTranslator(TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                        new Fields("topic", "partition", "offset", "key", 
"value"), STREAM)
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
deleted file mode 100644
index ca65177..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build 
tuples
-     */
-    public TopicTest2TupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.partition(),
-                consumerRecord.offset());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
deleted file mode 100644
index 4c55aa1..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicsTest0Test1TupleBuilder<K, V> extends 
KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build 
tuples
-     */
-    public TopicsTest0Test1TupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.partition(),
-                consumerRecord.offset(),
-                consumerRecord.key(),
-                consumerRecord.value());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
new file mode 100644
index 0000000..da87a03
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.apache.storm.shade.org.json.simple.JSONValue;
+import org.junit.Test;
+
+public class KafkaTridentSpoutBatchMetadataTest {
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testMetadataIsRoundTripSerializableWithJsonSimple() throws 
Exception {
+        /**
+         * Tests that the metadata object can be converted to and from a Map. 
This is needed because Trident metadata is written to
+         * Zookeeper as JSON with the json-simple library, so the spout 
converts the metadata to Map before returning it to Trident.
+         * It is important that all map entries are types json-simple knows 
about,
+         * since otherwise the library just calls toString on them which will 
likely produce invalid JSON.
+         */
+        TopicPartition tp = new TopicPartition("topic", 0);
+        long startOffset = 10;
+        long endOffset = 20;
+
+        KafkaTridentSpoutBatchMetadata metadata = new 
KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset);
+        Map<String, Object> map = metadata.toMap();
+        Map deserializedMap = 
(Map)JSONValue.parseWithException(JSONValue.toJSONString(map));
+        KafkaTridentSpoutBatchMetadata deserializedMetadata = 
KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap);
+        assertThat(deserializedMetadata.getTopicPartition(), 
is(metadata.getTopicPartition()));
+        assertThat(deserializedMetadata.getFirstOffset(), 
is(metadata.getFirstOffset()));
+        assertThat(deserializedMetadata.getLastOffset(), 
is(metadata.getLastOffset()));
+    }
+
+    @Test
+    public void testCreateMetadataFromRecords() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        long firstOffset = 15;
+        long lastOffset = 55;
+        ConsumerRecords<?, ?> records = new 
ConsumerRecords<>(Collections.singletonMap(tp, 
SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) 
(lastOffset - firstOffset + 1))));
+
+        KafkaTridentSpoutBatchMetadata metadata = new 
KafkaTridentSpoutBatchMetadata(tp, records);
+        assertThat("The first offset should be the first offset in the record 
set", metadata.getFirstOffset(), is(firstOffset));
+        assertThat("The last offset should be the last offset in the record 
set", metadata.getLastOffset(), is(lastOffset));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml 
b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} 
- %msg%n" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+    </Loggers>
+</Configuration>
\ No newline at end of file

Reply via email to