STORM-2913: Add metadata to at-most-once and at-least-once commits

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e756889a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e756889a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e756889a

Branch: refs/heads/master
Commit: e756889aa712aee22c216bc99ee17b972abf886d
Parents: eff32a3
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Sat Jan 27 15:15:45 2018 +0100
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Sun Feb 4 23:18:54 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 80 ++++++-----------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  4 +-
 .../spout/internal/CommitMetadataManager.java   | 91 ++++++++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++----
 4 files changed, 146 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 84e7851..9d133a7 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,8 +23,6 @@ import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -72,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
-    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
 
     // Storm
     protected SpoutOutputCollector collector;
@@ -104,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
-    // Metadata information to commit to Kafka. It is unique per spout per 
topology.
-    private transient String commitMetadata;
+    private transient CommitMetadataManager commitMetadataManager;
     private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -142,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = new HashMap<>();
-        setCommitMetadata(context);
+        commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
@@ -154,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void registerMetric() {
         LOG.info("Registering Spout Metrics");
-        kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> 
kafkaConsumer);
+        kafkaOffsetMetric = new KafkaOffsetMetric(() -> 
Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
         context.registerMetric("kafkaOffset", kafkaOffsetMetric, 
kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
     }
 
@@ -168,16 +165,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return true;
     }
 
-    private void setCommitMetadata(TopologyContext context) {
-        try {
-            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
-                context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
-        } catch (JsonProcessingException e) {
-            LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
-            throw new RuntimeException(e);
-        }
-    }
-
     private boolean isAtLeastOnceProcessing() {
         return kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
     }
@@ -215,8 +202,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 retryService.retainAll(partitions);
 
                 /*
-                 * Emitted messages for partitions that are no longer assigned 
to this spout can't
-                 * be acked and should not be retried, hence remove them from 
emitted collection.
+                 * Emitted messages for partitions that are no longer assigned 
to this spout can't be acked and should not be retried, hence
+                 * remove them from emitted collection.
                  */
                 emitted.removeIf(msgId -> 
!partitions.contains(msgId.getTopicPartition()));
             }
@@ -246,7 +233,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (committedOffset != null) {
                 // offset was previously committed for this consumer group and 
topic-partition, either by this or another topology.
-                if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+                if 
(commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, 
Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already 
committed, therefore FirstPollOffsetStrategy does not apply.
                     kafkaConsumer.seek(newTp, committedOffset.offset());
                 } else {
@@ -274,31 +261,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    /**
-     * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology. This info is used to decide if
-     * {@link FirstPollOffsetStrategy} should be applied
-     *
-     * @param tp topic-partition
-     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
-     * @return true if this topology committed this {@link OffsetAndMetadata}, 
false otherwise
-     */
-    private boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
-        try {
-            if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).hasCommitted()) {
-                return true;
-            }
-
-            final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
-            return 
committedMetadata.getTopologyId().equals(context.getStormId());
-        } catch (IOException e) {
-            LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
-                + "for this topic-partition was done using an earlier version 
of Storm. "
-                + "Defaulting to behavior compatible with earlier version", 
committedOffset);
-            LOG.trace("", e);
-            return false;
-        }
-    }
-
     // ======== Next Tuple =======
     @Override
     public void nextTuple() {
@@ -311,7 +273,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (isAtLeastOnceProcessing()) {
                     commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NO_GUARANTEE) {
-                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                        
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
                 }
             }
 
@@ -396,7 +361,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is 
at-most-once.
-                kafkaConsumer.commitSync();
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                kafkaConsumer.commitSync(offsetsToCommit);
+                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
             return consumerRecords;
         } finally {
@@ -469,11 +437,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
         } else {
             final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, 
committedOffset)
-                && committedOffset.offset() > record.offset()) {
+            if (isAtLeastOnceProcessing()
+                && committedOffset != null 
+                && committedOffset.offset() > record.offset()
+                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, 
committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                 // Ensures that after a topology with this id is started, the 
consumer fetch
                 // position never falls behind the committed offset 
(STORM-2844)
-                throw new IllegalStateException("Attempting to emit a message 
that has already been committed.");
+                throw new IllegalStateException("Attempting to emit a message 
that has already been committed."
+                    + " This should never occur when using the at-least-once 
processing guarantee.");
             }
 
             final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
@@ -519,13 +490,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
-    private void commitFetchedOffsetsAsync(Set<TopicPartition> 
assignedPartitions) {
+    private Map<TopicPartition, OffsetAndMetadata> 
createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
         Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
         for (TopicPartition tp : assignedPartitions) {
-            offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
+            offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp), 
commitMetadataManager.getCommitMetadata()));
         }
-        kafkaConsumer.commitAsync(offsetsToCommit, null);
-        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+        return offsetsToCommit;
     }
     
     private void commitOffsetsForAckedTuples(Set<TopicPartition> 
assignedPartitions) {
@@ -536,7 +506,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new 
HashMap<>();
         for (Map.Entry<TopicPartition, OffsetManager> tpOffset : 
assignedOffsetManagers.entrySet()) {
-            final OffsetAndMetadata nextCommitOffset = 
tpOffset.getValue().findNextCommitOffset(commitMetadata);
+            final OffsetAndMetadata nextCommitOffset = 
tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index c2305cb..40e449a 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -474,7 +474,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
                  * error, rather than seeking to the latest (Kafka's default). 
This type of error will typically happen when the consumer
                  * requests an offset that was deleted.
                  */
-                LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' 
to ensure at-least-once processing",
                     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
                 
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             } else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
@@ -488,7 +488,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
                     + " Some messages may be processed more than once.");
             }
         }
-        LOG.info("Setting consumer property '{}' to 'false', because the spout 
does not support auto-commit",
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the 
spout does not support auto-commit",
             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout 
instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee 
processingGuarantee) {
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, 
false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+            return 
committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the 
last commit to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a 
process other than the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", 
committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 12391c8..a9e7c6c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,11 +21,12 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -84,12 +86,19 @@ public class KafkaSpoutMessagingGuaranteeTest {
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
         spout.nextTuple();
+        
+        when(consumerMock.position(partition)).thenReturn(1L);
 
         //The spout should have emitted the tuple, and must have committed it 
before emit
         InOrder inOrder = inOrder(consumerMock, collectorMock);
         inOrder.verify(consumerMock).poll(anyLong());
-        inOrder.verify(consumerMock).commitSync();
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
 anyList());
+        
+        CommitMetadataManager metadataManager = new 
CommitMetadataManager(contextMock, 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), 
is(metadataManager.getCommitMetadata()));
     }
 
     private void 
doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> 
spoutConfig) {
@@ -172,7 +181,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
         doTestModeCannotReplayTuples(spoutConfig);
     }
 
-    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, 
String> spoutConfig) {
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked 
tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
         try (SimulatedTime time = new SimulatedTime()) {
             KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock,partition);
 
@@ -180,6 +195,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
             spout.nextTuple();
+            clearInvocations(consumerMock);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), 
anyList(), msgIdCaptor.capture());
@@ -189,21 +205,15 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + 
spoutConfig.getOffsetsCommitPeriodMs());
             
+            when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
+            
             spout.nextTuple();
             
-            verify(consumerMock, never()).commitSync(any());
+            verify(consumerMock, never()).commitSync(argThat(arg -> {
+                return !arg.containsKey(partition);
+            }));
         }
     }
-
-    @Test
-    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked 
tuples in at-most-once mode because they were committed before being emitted
-        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setTupleTrackingEnforced(true)
-            .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
-    }
     
     @Test
     public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
@@ -233,9 +243,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             verify(consumerMock).commitAsync(commitCapture.capture(), 
isNull());
             
-            Map<TopicPartition, OffsetAndMetadata> commit = 
commitCapture.getValue();
-            assertThat(commit.containsKey(partition), is(true));
-            assertThat(commit.get(partition).offset(), is(1L));
+            CommitMetadataManager metadataManager = new 
CommitMetadataManager(contextMock, 
KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), 
is(metadataManager.getCommitMetadata()));
         }
     }
 

Reply via email to