http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
new file mode 100644
index 0000000..f6de6a8
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+
+public class KafkaSpoutRetryExponentialBackoffTest {
+    
+    private final TopicPartition testTopic = new TopicPartition("topic", 0);
+    private final TopicPartition testTopic2 = new 
TopicPartition("other-topic", 0);
+
+    private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(0));
+    }
+
+    private KafkaSpoutRetryExponentialBackoff 
createOneSecondWaitRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(1));
+    }
+
+    private ConsumerRecord<String, String> createRecord(TopicPartition tp, 
long offset) {
+        return new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, 
null);
+    }
+    
+    @Test
+    public void testCanScheduleRetry() {
+        KafkaSpoutRetryExponentialBackoff retryService = 
createNoWaitRetryService();
+        long offset = 0;
+        KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+        msgId.incrementNumFails();
+
+        boolean scheduled = retryService.schedule(msgId);
+
+        assertThat("The service must schedule the message for retry", 
scheduled, is(true));
+        KafkaSpoutMessageId retrievedMessageId = 
retryService.getMessageId(createRecord(testTopic, offset));
+        assertThat("The service should return the original message id when 
asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId));
+        assertThat(retryService.isScheduled(msgId), is(true));
+        assertThat(retryService.isReady(msgId), is(true));
+        assertThat(retryService.readyMessageCount(), is(1));
+        assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgId.offset())));
+    }
+
+    @Test
+    public void testCanRescheduleRetry() {
+        try (SimulatedTime time = new SimulatedTime()) {
+
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+            KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+            Time.advanceTime(500);
+            boolean scheduled = retryService.schedule(msgId);
+
+            assertThat("The service must be able to reschedule an already 
scheduled id", scheduled, is(true));
+            Time.advanceTime(500);
+            assertThat("The message should not be ready for retry yet since it 
was rescheduled", retryService.isReady(msgId), is(false));
+            assertThat(retryService.isScheduled(msgId), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.<TopicPartition, Long>emptyMap()));
+            assertThat(retryService.readyMessageCount(), is(0));
+            Time.advanceTime(500);
+            assertThat("The message should be ready for retry once the full 
delay has passed", retryService.isReady(msgId), is(true));
+            assertThat(retryService.isScheduled(msgId), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgId.offset())));
+            assertThat(retryService.readyMessageCount(), is(1));
+        }
+    }
+    
+    @Test
+    public void testCannotContainMultipleSchedulesForId() {
+        try (SimulatedTime time = new SimulatedTime()) {
+
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+            KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+            Time.advanceTime(500);
+            boolean scheduled = retryService.schedule(msgId);
+            
+            retryService.remove(msgId);
+            assertThat("The message should no longer be scheduled", 
retryService.isScheduled(msgId), is(false));
+            Time.advanceTime(500);
+            assertThat("The message should not be ready for retry because it 
isn't scheduled", retryService.isReady(msgId), is(false));
+        }
+    }
+
+    @Test
+    public void testCanRemoveRetry() {
+        KafkaSpoutRetryExponentialBackoff retryService = 
createNoWaitRetryService();
+        long offset = 0;
+        KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+        msgId.incrementNumFails();
+
+        retryService.schedule(msgId);
+        boolean removed = retryService.remove(msgId);
+
+        assertThat(removed, is(true));
+        assertThat(retryService.isScheduled(msgId), is(false));
+        assertThat(retryService.isReady(msgId), is(false));
+        assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.<TopicPartition, Long>emptyMap()));
+        assertThat(retryService.readyMessageCount(), is(0));
+    }
+
+    @Test
+    public void testCanHandleMultipleTopics() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            //Tests that isScheduled, isReady and earliestRetriableOffsets are 
mutually consistent when there are messages from multiple partitions scheduled
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+
+            KafkaSpoutMessageId msgIdTp1 = 
retryService.getMessageId(createRecord(testTopic, offset));
+            KafkaSpoutMessageId msgIdTp2 = 
retryService.getMessageId(createRecord(testTopic2, offset));
+            msgIdTp1.incrementNumFails();
+            msgIdTp2.incrementNumFails();
+
+            boolean scheduledOne = retryService.schedule(msgIdTp1);
+            Time.advanceTime(500);
+            boolean scheduledTwo = retryService.schedule(msgIdTp2);
+
+            //The retry schedules for two messages should be unrelated
+            assertThat(scheduledOne, is(true));
+            assertThat(retryService.isScheduled(msgIdTp1), is(true));
+            assertThat(scheduledTwo, is(true));
+            assertThat(retryService.isScheduled(msgIdTp2), is(true));
+            assertThat(retryService.isReady(msgIdTp1), is(false));
+            assertThat(retryService.isReady(msgIdTp2), is(false));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdTp1), is(true));
+            assertThat(retryService.isReady(msgIdTp2), is(false));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, offset)));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdTp2), is(true));
+            Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
+            earliestOffsets.put(testTopic, offset);
+            earliestOffsets.put(testTopic2, offset);
+            assertThat(retryService.earliestRetriableOffsets(), 
is(earliestOffsets));
+
+            //The service must be able to remove retry schedules for 
unnecessary partitions
+            retryService.retainAll(Collections.singleton(testTopic2));
+            assertThat(retryService.isScheduled(msgIdTp1), is(false));
+            assertThat(retryService.isScheduled(msgIdTp2), is(true));
+            assertThat(retryService.isReady(msgIdTp1), is(false));
+            assertThat(retryService.isReady(msgIdTp2), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic2, offset)));
+        }
+    }
+
+    @Test
+    public void testCanHandleMultipleMessagesOnPartition() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            //Tests that isScheduled, isReady and earliestRetriableOffsets are 
mutually consistent when there are multiple messages scheduled on a partition
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+
+            KafkaSpoutMessageId msgIdEarliest = 
retryService.getMessageId(createRecord(testTopic, offset));
+            KafkaSpoutMessageId msgIdLatest = 
retryService.getMessageId(createRecord(testTopic, offset + 1));
+            msgIdEarliest.incrementNumFails();
+            msgIdLatest.incrementNumFails();
+
+            retryService.schedule(msgIdEarliest);
+            Time.advanceTime(500);
+            retryService.schedule(msgIdLatest);
+
+            assertThat(retryService.isScheduled(msgIdEarliest), is(true));
+            assertThat(retryService.isScheduled(msgIdLatest), is(true));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdEarliest), is(true));
+            assertThat(retryService.isReady(msgIdLatest), is(false));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdEarliest.offset())));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdEarliest), is(true));
+            assertThat(retryService.isReady(msgIdLatest), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdEarliest.offset())));
+
+            retryService.remove(msgIdEarliest);
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdLatest.offset())));
+        }
+    }
+
+    @Test
+    public void testMaxRetries() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            int maxRetries = 3;
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+            for (int i = 0; i < maxRetries; i++) {
+                msgId.incrementNumFails();
+            }
+
+            //Should be allowed to retry 3 times, in addition to original try
+            boolean scheduled = retryService.schedule(msgId);
+
+            assertThat(scheduled, is(true));
+            assertThat(retryService.isScheduled(msgId), is(true));
+
+            retryService.remove(msgId);
+            msgId.incrementNumFails();
+            boolean rescheduled = retryService.schedule(msgId);
+
+            assertThat("The message should not be allowed to retry once the 
limit is reached", rescheduled, is(false));
+            assertThat(retryService.isScheduled(msgId), is(false));
+        }
+    }
+
+    @Test
+    public void testMaxDelay() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            int maxDelaySecs = 2;
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+
+            Time.advanceTimeSecs(maxDelaySecs);
+            assertThat("The message should be ready for retry after the max 
delay", retryService.isReady(msgId), is(true));
+        }
+    }
+
+    private void validateBackoff(int expectedBackoffSeconds, 
KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) {
+        Time.advanceTimeSecs(expectedBackoffSeconds - 1);
+        assertThat("The message should not be ready for retry until the 
backoff has expired", retryService.isReady(msgId), is(false));
+        Time.advanceTimeSecs(1);
+        assertThat(retryService.isReady(msgId), is(true));
+    }
+
+    @Test
+    public void testExponentialBackoff() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(4), Integer.MAX_VALUE, 
TimeInterval.seconds(Integer.MAX_VALUE));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = 
retryService.getMessageId(createRecord(testTopic, offset));
+            msgId.incrementNumFails();
+            msgId.incrementNumFails(); //First failure is the initial delay, 
so not interesting
+
+            //Expecting 4*2^(failCount-1)
+            List<Integer> expectedBackoffsSecs = Arrays.asList(new 
Integer[]{8, 16, 32});
+            
+            for (Integer expectedBackoffSecs : expectedBackoffsSecs) {
+                retryService.schedule(msgId);
+
+                Time.advanceTimeSecs(expectedBackoffSecs - 1);
+                assertThat("The message should not be ready for retry until 
backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), 
is(false));
+                Time.advanceTimeSecs(1);
+                assertThat("The message should be ready for retry once backoff 
" + expectedBackoffSecs + " has expired", retryService.isReady(msgId), 
is(true));
+
+                msgId.incrementNumFails();
+                retryService.remove(msgId);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
new file mode 100644
index 0000000..569becf
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+
+public class KafkaSpoutRetryLimitTest {
+    
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+    
+    public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+    
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+    
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), 
-1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+            .build();
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+        //Spout should ack failed messages after they hit the retry limit
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
+            int lastOffset = 3;
+            int numRecords = lastOffset + 1;
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 0, numRecords));
+            
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords<>(records));
+            
+            for (int i = 0; i < numRecords; i++) {
+                spout.nextTuple();
+            }
+            
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(numRecords)).emit(anyString(), 
anyList(), messageIds.capture());
+            
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.fail(messageId);
+            }
+
+            // Advance time and then trigger call to kafka consumer commit
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+            inOrder.verify(consumerMock).poll(anyLong());
+
+            //verify that offset 4 was committed for the given TopicPartition, 
since processing should resume at 4.
+            assertTrue(commitCapture.getValue().containsKey(partition));
+            assertEquals(lastOffset + 1, ((OffsetAndMetadata) 
(commitCapture.getValue().get(partition))).offset());
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
new file mode 100644
index 0000000..0bf9219
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.utils.Time;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.hamcrest.Matchers;
+
+public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
+    private final int maxPollRecords = 10;
+    private final int maxRetries = 3;
+
+    public KafkaSpoutSingleTopicTest() {
+        super(2_000);
+    }
+
+    @Override
+    KafkaSpoutConfig<String, String> createSpoutConfig() {
+        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+            KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+                maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+            .build();
+    }
+
+    @Test
+    public void 
testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws 
Exception {
+        final int messageCount = maxPollRecords * 2;
+        prepareSpout(messageCount);
+
+        //Emit all messages and fail the first one while acking the rest
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(messageCount)).emit(anyString(), 
anyList(), messageIdCaptor.capture());
+        List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
+        for (int i = 1; i < messageIds.size(); i++) {
+            spout.ack(messageIds.get(i));
+        }
+        KafkaSpoutMessageId failedTuple = messageIds.get(0);
+        spout.fail(failedTuple);
+
+        //Advance the time and replay the failed tuple. 
+        reset(collectorMock);
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyString(), anyList(), 
failedIdReplayCaptor.capture());
+
+        assertThat("Expected replay of failed tuple", 
failedIdReplayCaptor.getValue(), is(failedTuple));
+
+        /* Ack the tuple, and commit.
+         * Since the tuple is more than max poll records behind the most 
recent emitted tuple, the consumer won't catch up in this poll.
+         */
+        reset(collectorMock);
+        Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+        spout.ack(failedIdReplayCaptor.getValue());
+        spout.nextTuple();
+        verify(consumerSpy).commitSync(commitCapture.capture());
+
+        Map<TopicPartition, OffsetAndMetadata> capturedCommit = 
commitCapture.getValue();
+        TopicPartition expectedTp = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        assertThat("Should have committed to the right topic", capturedCommit, 
Matchers.hasKey(expectedTp));
+        assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount));
+
+            /* Verify that the following acked (now committed) tuples are not 
emitted again
+             * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,
+             * this verifies that the spout keeps the consumer position ahead 
of the committed offset when committing
+             */
+        //Just do a few polls to check that nothing more is emitted
+        for(int i = 0; i < 3; i++) {
+            spout.nextTuple();
+        }
+        verify(collectorMock, never()).emit(anyString(), anyList(), 
anyObject());
+    }
+
+    @Test
+    public void testShouldContinueWithSlowDoubleAcks() throws Exception {
+        final int messageCount = 20;
+        prepareSpout(messageCount);
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
+        spout.ack(messageIdToDoubleAck.getValue());
+
+        //Emit some more messages
+        for(int i = 0; i < messageCount / 2; i++) {
+            spout.nextTuple();
+        }
+
+        spout.ack(messageIdToDoubleAck.getValue());
+
+        //Emit any remaining messages
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        //Verify that all messages are emitted, ack all the messages
+        ArgumentCaptor<Object> messageIds = 
ArgumentCaptor.forClass(Object.class);
+        verify(collectorMock, 
times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            messageIds.capture());
+        for(Object id : messageIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldEmitAllMessages() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //Emit all messages and check that they are emitted. Ack the messages 
too
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+            ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
+            verify(collectorMock).emit(
+                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+                    Integer.toString(i),
+                    Integer.toString(i))),
+                messageId.capture());
+            spout.ack(messageId.getValue());
+            reset(collectorMock);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayInOrderFailedMessages() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play and ack 1 tuple
+        ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdAcked.capture());
+        spout.ack(messageIdAcked.getValue());
+        reset(collectorMock);
+
+        //play and fail 1 tuple
+        ArgumentCaptor<Object> messageIdFailed = 
ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdFailed.capture());
+        spout.fail(messageIdFailed.getValue());
+        reset(collectorMock);
+
+        //Emit all remaining messages. Failed tuples retry immediately with 
current configuration, so no need to wait.
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<Object> remainingMessageIds = 
ArgumentCaptor.forClass(Object.class);
+        //All messages except the first acked message should have been emitted
+        verify(collectorMock, times(messageCount - 1)).emit(
+            eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            remainingMessageIds.capture());
+        for(Object id : remainingMessageIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToFail.capture());
+        reset(collectorMock);
+
+        //play 2nd tuple
+        ArgumentCaptor<Object> messageIdToAck = 
ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToAck.capture());
+        reset(collectorMock);
+
+        //ack 2nd tuple
+        spout.ack(messageIdToAck.getValue());
+        //fail 1st tuple
+        spout.fail(messageIdToFail.getValue());
+
+        //Emit all remaining messages. Failed tuples retry immediately with 
current configuration, so no need to wait.
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
+        //All messages except the first acked message should have been emitted
+        verify(collectorMock, times(messageCount - 1)).emit(
+            eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            remainingIds.capture());
+        for(Object id : remainingIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws 
Exception {
+        //The spout must reemit retriable tuples, even if they fail out of 
order.
+        //The spout should be able to skip tuples it has already emitted when 
retrying messages, even if those tuples are also retries.
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play all tuples
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(messageCount)).emit(anyString(), 
anyList(), messageIds.capture());
+        reset(collectorMock);
+        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
+        List<KafkaSpoutMessageId> capturedMessageIds = 
messageIds.getAllValues();
+        spout.fail(capturedMessageIds.get(5));
+        spout.fail(capturedMessageIds.get(3));
+        spout.nextTuple();
+        spout.fail(capturedMessageIds.get(2));
+
+        //Check that the spout will reemit all 3 failed tuples and no other 
tuples
+        ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        verify(collectorMock, times(3)).emit(anyString(), anyList(), 
reemittedMessageIds.capture());
+        Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
+        expectedReemitIds.add(capturedMessageIds.get(5));
+        expectedReemitIds.add(capturedMessageIds.get(3));
+        expectedReemitIds.add(capturedMessageIds.get(2));
+        assertThat("Expected reemits to be the 3 failed tuples", new 
HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
+    }
+
+    @Test
+    public void testShouldDropMessagesAfterMaxRetriesAreReached() throws 
Exception {
+        //Check that if one message fails repeatedly, the retry cap limits how 
many times the message can be reemitted
+        final int messageCount = 1;
+        prepareSpout(messageCount);
+
+        //Emit and fail the same tuple until we've reached retry limit
+        for (int i = 0; i <= maxRetries; i++) {
+            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            spout.nextTuple();
+            verify(collectorMock).emit(anyString(), anyListOf(Object.class), 
messageIdFailed.capture());
+            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+            spout.fail(msgId);
+            assertThat("Expected message id number of failures to match the 
number of times the message has failed", msgId.numFails(), is(i + 1));
+            reset(collectorMock);
+        }
+
+        //Verify that the tuple is not emitted again
+        spout.nextTuple();
+        verify(collectorMock, never()).emit(anyString(), 
anyListOf(Object.class), anyObject());
+    }
+
+    @Test
+    public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws 
Exception {
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
+
+        //Nothing is assigned yet, should emit nothing
+        spout.nextTuple();
+        verify(collectorMock, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+
+        
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+        Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS 
+ KafkaSpout.TIMER_DELAY_MS);
+
+        //The new partition should be discovered and the message should be 
emitted
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+    }
+
+    @Test
+    public void testOffsetMetrics() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) 
spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        // the offset of the last available message + 1.
+        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
+        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+        //Emit all messages and check that they are emitted. Ack the messages 
too
+        for (int i = 0; i < messageCount; i++) {
+            nextTuple_verifyEmitted_ack_resetCollector(i);
+        }
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+
+        offsetMetric  = (Map<String, Long>) 
spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        //latest offset
+        assertEquals(9, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        // offset where processing will resume upon spout restart
+        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
new file mode 100644
index 0000000..a860cef
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.junit.Test;
+
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.when;
+
+public class KafkaSpoutTopologyDeployActivateDeactivateTest extends 
KafkaSpoutAbstractTest {
+
+    public KafkaSpoutTopologyDeployActivateDeactivateTest() {
+        super(2_000);
+    }
+
+    @Override
+    KafkaSpoutConfig<String, String> createSpoutConfig() {
+        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+            KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+            .build();
+    }
+
+    @Test
+    public void 
test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() 
throws Exception {
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        consumerSpy = createConsumerSpy();
+
+        spout.activate();
+
+        nextTuple_verifyEmitted_ack_resetCollector(1);
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void 
test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws 
Exception {
+        when(topologyContext.getStormId()).thenReturn("topology-1");
+
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        // Restart topology with the same topology id, which mimics the 
behavior of partition reassignment
+        setUp();
+        // Initialize spout using the same populated data (i.e same 
kafkaUnitRule)
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
+
+        nextTuple_verifyEmitted_ack_resetCollector(1);
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void 
test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws 
Exception {
+        when(topologyContext.getStormId()).thenReturn("topology-1");
+
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        // Restart topology with a different topology id
+        setUp();
+        when(topologyContext.getStormId()).thenReturn("topology-2");
+        // Initialize spout using the same populated data (i.e same 
kafkaUnitRule)
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
+
+        //Emit all messages and check that they are emitted. Ack the messages 
too
+        for (int i = 0; i < messageCount; i++) {
+            nextTuple_verifyEmitted_ack_resetCollector(i);
+        }
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
new file mode 100755
index 0000000..b90a49d
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.kafka.KafkaUnitRule;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockitoAnnotations;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+
+public class MaxUncommittedOffsetTest {
+
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    private final TopologyContext topologyContext = 
mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private final int numMessages = 100;
+    private final int maxUncommittedOffsets = 10;
+    private final int maxPollRecords = 5;
+    private final int initialRetryDelaySecs = 60;
+    private final KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+        .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+        .setMaxUncommittedOffsets(maxUncommittedOffsets)
+        .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+            1, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) 
//Retry once after a minute
+        .build();
+    private KafkaSpout<String, String> spout;
+
+
+
+    @Before
+    public void setUp() {
+        //This is because the tests are checking that a hard cap of 
maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists
+        //so Kafka must be able to return more messages than that in order for 
the tests to be meaningful
+        assertThat("Current tests require numMessages >= 
2*maxUncommittedOffsets", numMessages, 
greaterThanOrEqualTo(maxUncommittedOffsets * 2));
+        //This is to verify that a low maxPollRecords does not interfere with 
reemitting failed tuples
+        //The spout must be able to reemit all retriable tuples, even if the 
maxPollRecords is set to a low value compared to maxUncommittedOffsets.
+        assertThat("Current tests require maxPollRecords < 
maxUncommittedOffsets", maxPollRecords, 
lessThanOrEqualTo(maxUncommittedOffsets));
+        MockitoAnnotations.initMocks(this);
+        spout = new KafkaSpout<>(spoutConfig);
+    }
+
+    private void prepareSpout(int msgCount) throws Exception {
+        
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+    }
+
+    private ArgumentCaptor<KafkaSpoutMessageId> 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) 
throws Exception {
+        assertThat("The message count is less than maxUncommittedOffsets. This 
test is not meaningful with this configuration.", messageCount, 
greaterThanOrEqualTo(maxUncommittedOffsets));
+        //The spout must respect maxUncommittedOffsets when 
requesting/emitting tuples
+        prepareSpout(messageCount);
+
+        //Try to emit all messages. Ensure only maxUncommittedOffsets are 
emitted
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        verify(collector, times(maxUncommittedOffsets)).emit(
+            anyString(),
+            anyList(),
+            messageIds.capture());
+        return messageIds;
+    }
+
+    @Test
+    public void 
testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit()
 throws Exception {
+        //The spout must respect maxUncommittedOffsets after committing a set 
of records
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when 
emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Ack all emitted messages and commit them
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.ack(messageId);
+            }
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+
+            spout.nextTuple();
+
+            //Now check that the spout will emit another maxUncommittedOffsets 
messages
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+            verify(collector, times(maxUncommittedOffsets)).emit(
+                anyString(),
+                anyList(),
+                anyObject());
+        }
+    }
+
+    @Test
+    public void 
testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples()
 throws Exception {
+        //The spout must respect maxUncommittedOffsets even if some tuples 
have been acked but not committed
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when 
emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Fail all emitted messages except the last one. Try to commit.
+            List<KafkaSpoutMessageId> messageIdList = 
messageIds.getAllValues();
+            for (int i = 0; i < messageIdList.size() - 1; i++) {
+                spout.fail(messageIdList.get(i));
+            }
+            spout.ack(messageIdList.get(messageIdList.size() - 1));
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            spout.nextTuple();
+
+            //Now check that the spout will not emit anything else since 
nothing has been committed
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+
+            verify(collector, times(0)).emit(
+                anyString(),
+                anyList(),
+                anyObject());
+        }
+    }
+
+    private void 
failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> 
messageIds) {
+        //Fail all emitted messages except the first. Commit the first.
+        List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+        for (int i = 1; i < messageIdList.size(); i++) {
+            spout.fail(messageIdList.get(i));
+        }
+        spout.ack(messageIdList.get(0));
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        spout.nextTuple();
+    }
+
+    @Test
+    public void 
testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages()
 throws Exception {
+        /*
+        For each partition the spout is allowed to retry all tuples between 
the committed offset, and maxUncommittedOffsets ahead.
+        It is not allowed to retry tuples past that limit.
+        This makes the actual limit per partition maxUncommittedOffsets + 
maxPollRecords - 1,
+        reached if the tuple at the maxUncommittedOffsets limit is the 
earliest retriable tuple,
+        or if the spout is 1 tuple below the limit, and receives a full 
maxPollRecords tuples in the poll.
+         */
+
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when 
emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Fail only the last tuple
+            List<KafkaSpoutMessageId> messageIdList = 
messageIds.getAllValues();
+            KafkaSpoutMessageId failedMessageId = 
messageIdList.get(messageIdList.size() - 1);
+            spout.fail(failedMessageId);
+
+            //Offset 0 to maxUncommittedOffsets - 2 are pending, 
maxUncommittedOffsets - 1 is failed but not retriable
+            //The spout should not emit any more tuples.
+            spout.nextTuple();
+            verify(collector, never()).emit(
+                anyString(),
+                anyList(),
+                any(KafkaSpoutMessageId.class));
+
+            //Allow the failed record to retry
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            for (int i = 0; i < maxPollRecords; i++) {
+                spout.nextTuple();
+            }
+            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(maxPollRecords)).emit(
+                anyString(),
+                anyList(),
+                secondRunMessageIds.capture());
+            reset(collector);
+            assertThat(secondRunMessageIds.getAllValues().get(0), 
is(failedMessageId));
+            
+            //There should now be maxUncommittedOffsets + maxPollRecords 
emitted in all.
+            //Fail the last emitted tuple and verify that the spout won't 
retry it because it's above the emit limit.
+            
spout.fail(secondRunMessageIds.getAllValues().get(secondRunMessageIds.getAllValues().size()
 - 1));
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            spout.nextTuple();
+            verify(collector, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+        }
+    }
+
+    @Test
+    public void testNextTupleWillAllowRetryForTuplesBelowEmitLimit() throws 
Exception {
+        /*
+        For each partition the spout is allowed to retry all tuples between 
the committed offset, and maxUncommittedOffsets ahead.
+        It must retry tuples within that limit, even if more tuples were 
emitted.
+         */
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when 
emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            failAllExceptTheFirstMessageThenCommit(messageIds);
+
+            //Offset 0 is committed, 1 to maxUncommittedOffsets - 1 are failed 
but not retriable
+            //The spout should now emit another maxPollRecords messages
+            //This is allowed because the committed message brings the 
numUncommittedOffsets below the cap
+            for (int i = 0; i < maxUncommittedOffsets; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(maxPollRecords)).emit(
+                anyString(),
+                anyList(),
+                secondRunMessageIds.capture());
+            reset(collector);
+
+            List<Long> firstRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                firstRunOffsets.add(msgId.offset());
+            }
+            List<Long> secondRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : 
secondRunMessageIds.getAllValues()) {
+                secondRunOffsets.add(msgId.offset());
+            }
+            assertThat("Expected the newly emitted messages to have no overlap 
with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
+
+            //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, 
maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+            //Fail the last tuples so only offset 0 is not failed.
+            //Advance time so the failed tuples become ready for retry, and 
check that the spout will emit retriable tuples
+            //for all the failed tuples that are within maxUncommittedOffsets 
tuples of the committed offset
+            //This means 1 to maxUncommitteddOffsets, but not 
maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1
+            for(KafkaSpoutMessageId msgId : 
secondRunMessageIds.getAllValues()) {
+                spout.fail(msgId);
+            }
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+            ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(maxUncommittedOffsets)).emit(
+                anyString(),
+                anyList(),
+                thirdRunMessageIds.capture());
+            reset(collector);
+
+            List<Long> thirdRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : 
thirdRunMessageIds.getAllValues()) {
+                thirdRunOffsets.add(msgId.offset());
+            }
+
+            assertThat("Expected the emitted messages to be retries of the 
failed tuples from the first batch, plus the first failed tuple from the second 
batch", thirdRunOffsets, 
everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset()))));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java
new file mode 100644
index 0000000..56bfddf
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+public class NamedSubscriptionTest {
+
+    private NamedSubscription namedSubscription;
+
+    @Test
+    public void testGetTopicsStringWithOneTopic() throws Exception {
+        Collection<String> topics = new ArrayList<>();
+        topics.add("test-topic1");
+
+        namedSubscription = new NamedSubscription(topics);
+
+        Assert.assertEquals(namedSubscription.getTopicsString(), 
"test-topic1");
+    }
+
+    @Test
+    public void testGetTopicsStringWithManyTopics() throws Exception {
+        Collection<String> topics = new ArrayList<>();
+        topics.add("test-topic1");
+        topics.add("test-topic2");
+        topics.add("test-topic3");
+
+        namedSubscription = new NamedSubscription(topics);
+
+        Assert.assertEquals(namedSubscription.getTopicsString(), 
"test-topic1,test-topic2,test-topic3");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
new file mode 100644
index 0000000..fe3325c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.kafka.spout.NamedTopicFilter;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, 
matchingTopicTwo);
+        
+        
when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne,
 0)));
+        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        
when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+        
when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic,
 0)));
+        
+        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected filter to pass only topics with exact name 
matches", matchedPartitions, 
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+            
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
new file mode 100644
index 0000000..335ab31
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.kafka.spout.PatternTopicFilter;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp(){
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        Pattern pattern = Pattern.compile("test-\\d+");
+        PatternTopicFilter filter = new PatternTopicFilter(pattern);
+        
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+        allTopics.put(matchingTopicOne, 
Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        allTopics.put(matchingTopicTwo, testTwoPartitions);
+        allTopics.put(unmatchedTopic, 
Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        when(consumerMock.listTopics()).thenReturn(allTopics);
+        
+        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected topic partitions matching the pattern to be 
passed by the filter", matchedPartitions,
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
new file mode 100644
index 0000000..f5b9423
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnit;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+
+public class SingleTopicKafkaUnitSetupHelper {
+
+    /**
+     * Using the given KafkaUnit instance, put some messages in the specified 
topic.
+     *
+     * @param kafkaUnit The KafkaUnit instance to use
+     * @param topicName The topic to produce messages for
+     * @param msgCount The number of messages to produce
+     */
+    public static void populateTopicData(KafkaUnit kafkaUnit, String 
topicName, int msgCount) throws Exception {
+        kafkaUnit.createTopic(topicName);
+        
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
+                topicName, Integer.toString(i),
+                Integer.toString(i));
+            kafkaUnit.sendMessage(producerRecord);
+        }
+    }
+
+    /*
+     * Asserts that commitSync has been called once, 
+     * that there are only commits on one topic,
+     * and that the committed offset covers messageCount messages
+     */
+    public static <K, V> void verifyAllMessagesCommitted(KafkaConsumer<K, V> 
consumerSpy,
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture, 
long messageCount) {
+        verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+        Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
+        assertThat("Expected commits for only one topic partition", 
commits.entrySet().size(), is(1));
+        OffsetAndMetadata offset = 
commits.entrySet().iterator().next().getValue();
+        assertThat("Expected committed offset to cover all emitted messages", 
offset.offset(), is(messageCount));
+    }
+
+    /**
+     * Open and activate a KafkaSpout that acts as a single-task/executor 
spout.
+     *
+     * @param <K> Kafka key type
+     * @param <V> Kafka value type
+     * @param spout The spout to prepare
+     * @param topoConf The topoConf
+     * @param topoContextMock The TopologyContext mock
+     * @param collectorMock The output collector mock
+     */
+    public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, 
Map<String, Object> topoConf, TopologyContext topoContextMock,
+        SpoutOutputCollector collectorMock) throws Exception {
+        when(topoContextMock.getThisTaskIndex()).thenReturn(0);
+        
when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
+        spout.open(topoConf, topoContextMock, collectorMock);
+        spout.activate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..3aad61e
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mockingDetails;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class SpoutWithMockedConsumerSetupHelper {
+
+    /**
+     * Creates, opens and activates a KafkaSpout using a mocked consumer. The 
subscription should be a mock object, since this method skips
+     * the subscription and instead just configures the mocked consumer to act 
as if the specified partitions are assigned to it.
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spoutConfig The spout config to use
+     * @param topoConf The topo conf to pass to the spout
+     * @param contextMock The topo context to pass to the spout
+     * @param collectorMock The mocked collector to pass to the spout
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to this spout. The 
consumer will act like these partitions are assigned to it.
+     * @return The spout
+     */
+    public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> 
spoutConfig, Map<String, Object> topoConf,
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, final 
KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
+        Subscription subscriptionMock = spoutConfig.getSubscription();
+        if (!mockingDetails(subscriptionMock).isMock()) {
+            throw new IllegalStateException("Use a mocked subscription when 
using this method, it helps avoid complex stubbing");
+        }
+
+        final Set<TopicPartition> assignedPartitionsSet = new 
HashSet<>(Arrays.asList(assignedPartitions));
+
+        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                ConsumerRebalanceListener listener = 
(ConsumerRebalanceListener) invocation.getArguments()[1];
+                listener.onPartitionsAssigned(assignedPartitionsSet);
+                return null;
+            }
+
+        }).when(subscriptionMock).subscribe(any(KafkaConsumer.class), 
any(ConsumerRebalanceListener.class), any(TopologyContext.class));
+
+        KafkaConsumerFactory<K, V> consumerFactory = new 
KafkaConsumerFactory<K, V>() {
+            @Override
+            public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> 
kafkaSpoutConfig) {
+                return consumerMock;
+            }
+        };
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
+
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        return spout;
+    }
+
+    /**
+     * Creates sequential dummy records
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param topic The topic partition to create records for
+     * @param startingOffset The starting offset of the records
+     * @param numRecords The number of records to create
+     * @return The dummy records
+     */
+    public static <K, V> List<ConsumerRecord<K, V>> 
createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+        List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            recordsForPartition.add(new ConsumerRecord<K, V>(topic.topic(), 
topic.partition(), startingOffset + i, null, null));
+        }
+        return recordsForPartition;
+    }
+
+    /**
+     * Creates messages for the input offsets, emits the messages by calling 
nextTuple once per offset and returns the captured message ids
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param expectedEmits The number of expected emits
+     * @param collectorMock The collector used by the spout
+     * @param partition The partition to emit messages on
+     * @param offsetsToEmit The offsets to emit
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, 
V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, 
SpoutOutputCollector collectorMock, TopicPartition partition, int... 
offsetsToEmit) {
+        return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, 
Collections.singletonMap(partition, offsetsToEmit));
+    }
+
+    /**
+     * Creates messages for the input offsets, emits the messages by calling 
nextTuple once per offset and returns the captured message ids
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param collectorMock The collector used by the spout
+     * @param offsetsToEmit The offsets to emit per partition
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, 
V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, 
SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) {
+        int totalOffsets = 0;
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new 
HashMap<>();
+        for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>();
+            for (Integer offset : entry.getValue()) {
+                tpRecords.add(new ConsumerRecord<K, V>(tp.topic(), 
tp.partition(), offset, null, null));
+                totalOffsets++;
+            }
+            records.put(tp, tpRecords);
+        }
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(records));
+
+        for (int i = 0; i < totalOffsets; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(expectedEmits)).emit(anyString(), 
anyList(), messageIds.capture());
+        return messageIds.getAllValues();
+    }
+
+}

Reply via email to