STORM-2913: Add metadata to at-most-once and at-least-once commits
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e756889a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e756889a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e756889a Branch: refs/heads/master Commit: e756889aa712aee22c216bc99ee17b972abf886d Parents: eff32a3 Author: Stig Rohde Døssing <s...@apache.org> Authored: Sat Jan 27 15:15:45 2018 +0100 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Sun Feb 4 23:18:54 2018 +0100 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 80 ++++++----------- .../storm/kafka/spout/KafkaSpoutConfig.java | 4 +- .../spout/internal/CommitMetadataManager.java | 91 ++++++++++++++++++++ .../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++---- 4 files changed, 146 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 84e7851..9d133a7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -23,8 +23,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -53,6 +51,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; import org.apache.storm.kafka.spout.internal.CommitMetadata; +import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; @@ -72,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { //Initial delay for the commit and subscription refresh timers public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); // Storm protected SpoutOutputCollector collector; @@ -104,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; - // Metadata information to commit to Kafka. It is unique per spout per topology. - private transient String commitMetadata; + private transient CommitMetadataManager commitMetadataManager; private transient KafkaOffsetMetric kafkaOffsetMetric; public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { @@ -142,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = new HashMap<>(); - setCommitMetadata(context); + commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); tupleListener.open(conf, context); if (canRegisterMetrics()) { @@ -154,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void registerMetric() { LOG.info("Registering Spout Metrics"); - kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer); + kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer); context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs()); } @@ -168,16 +165,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return true; } - private void setCommitMetadata(TopologyContext context) { - try { - commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( - context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); - } catch (JsonProcessingException e) { - LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); - throw new RuntimeException(e); - } - } - private boolean isAtLeastOnceProcessing() { return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; } @@ -215,8 +202,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { retryService.retainAll(partitions); /* - * Emitted messages for partitions that are no longer assigned to this spout can't - * be acked and should not be retried, hence remove them from emitted collection. + * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence + * remove them from emitted collection. */ emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition())); } @@ -246,7 +233,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { if (committedOffset != null) { // offset was previously committed for this consumer group and topic-partition, either by this or another topology. - if (isOffsetCommittedByThisTopology(newTp, committedOffset)) { + if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply. kafkaConsumer.seek(newTp, committedOffset.offset()); } else { @@ -274,31 +261,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } - /** - * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if - * {@link FirstPollOffsetStrategy} should be applied - * - * @param tp topic-partition - * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka - * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise - */ - private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) { - try { - if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) { - return true; - } - - final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); - return committedMetadata.getTopologyId().equals(context.getStormId()); - } catch (IOException e) { - LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit " - + "for this topic-partition was done using an earlier version of Storm. " - + "Defaulting to behavior compatible with earlier version", committedOffset); - LOG.trace("", e); - return false; - } - } - // ======== Next Tuple ======= @Override public void nextTuple() { @@ -311,7 +273,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(kafkaConsumer.assignment()); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { - commitFetchedOffsetsAsync(kafkaConsumer.assignment()); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = + createFetchedOffsetsMetadata(kafkaConsumer.assignment()); + kafkaConsumer.commitAsync(offsetsToCommit, null); + LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } } @@ -396,7 +361,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. - kafkaConsumer.commitSync(); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = + createFetchedOffsetsMetadata(kafkaConsumer.assignment()); + kafkaConsumer.commitSync(offsetsToCommit); + LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } return consumerRecords; } finally { @@ -469,11 +437,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); - if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset) - && committedOffset.offset() > record.offset()) { + if (isAtLeastOnceProcessing() + && committedOffset != null + && committedOffset.offset() > record.offset() + && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Ensures that after a topology with this id is started, the consumer fetch // position never falls behind the committed offset (STORM-2844) - throw new IllegalStateException("Attempting to emit a message that has already been committed."); + throw new IllegalStateException("Attempting to emit a message that has already been committed." + + " This should never occur when using the at-least-once processing guarantee."); } final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); @@ -519,13 +490,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } - private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions) { + private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (TopicPartition tp : assignedPartitions) { - offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp))); + offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata())); } - kafkaConsumer.commitAsync(offsetsToCommit, null); - LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); + return offsetsToCommit; } private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) { @@ -536,7 +506,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) { - final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata); + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata()); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); } http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index c2305cb..40e449a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -474,7 +474,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer * requests an offset that was deleted. */ - LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing", + LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { @@ -488,7 +488,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { + " Some messages may be processed more than once."); } } - LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit", + LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); } http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java new file mode 100644 index 0000000..a63619c --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generates and reads commit metadata. + */ +public final class CommitMetadataManager { + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class); + // Metadata information to commit to Kafka. It is unique per spout instance. + private final String commitMetadata; + private final ProcessingGuarantee processingGuarantee; + private final TopologyContext context; + + /** + * Create a manager with the given context. + */ + public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) { + this.context = context; + try { + commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( + context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); + this.processingGuarantee = processingGuarantee; + } catch (JsonProcessingException e) { + LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); + throw new RuntimeException(e); + } + } + + /** + * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. + * + * @param tp The topic partition the commit metadata belongs to. + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @param offsetManagers The offset managers. + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ + public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset, + Map<TopicPartition, OffsetManager> offsetManagers) { + try { + if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE + && offsetManagers.containsKey(tp) + && offsetManagers.get(tp).hasCommitted()) { + return true; + } + + final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); + return committedMetadata.getTopologyId().equals(context.getStormId()); + } catch (IOException e) { + LOG.warn("Failed to deserialize expected commit metadata [{}]." + + " This error is expected to occur once per partition, if the last commit to each partition" + + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. " + + "Defaulting to behavior compatible with earlier version", committedOffset); + LOG.trace("", e); + return false; + } + } + + public String getCommitMetadata() { + return commitMetadata; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 12391c8..a9e7c6c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -21,11 +21,12 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -84,12 +86,19 @@ public class KafkaSpoutMessagingGuaranteeTest { SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); + + when(consumerMock.position(partition)).thenReturn(1L); //The spout should have emitted the tuple, and must have committed it before emit InOrder inOrder = inOrder(consumerMock, collectorMock); inOrder.verify(consumerMock).poll(anyLong()); - inOrder.verify(consumerMock).commitSync(); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE); + Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(0L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); } private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) { @@ -172,7 +181,13 @@ public class KafkaSpoutMessagingGuaranteeTest { doTestModeCannotReplayTuples(spoutConfig); } - private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) { + @Test + public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { + //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setTupleTrackingEnforced(true) + .build(); try (SimulatedTime time = new SimulatedTime()) { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); @@ -180,6 +195,7 @@ public class KafkaSpoutMessagingGuaranteeTest { SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); + clearInvocations(consumerMock); ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); @@ -189,21 +205,15 @@ public class KafkaSpoutMessagingGuaranteeTest { Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + spout.nextTuple(); - verify(consumerMock, never()).commitSync(any()); + verify(consumerMock, never()).commitSync(argThat(arg -> { + return !arg.containsKey(partition); + })); } } - - @Test - public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { - //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted - KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setTupleTrackingEnforced(true) - .build(); - doTestModeDoesNotCommitAckedTuples(spoutConfig); - } @Test public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { @@ -233,9 +243,10 @@ public class KafkaSpoutMessagingGuaranteeTest { verify(consumerMock).commitAsync(commitCapture.capture(), isNull()); - Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue(); - assertThat(commit.containsKey(partition), is(true)); - assertThat(commit.get(partition).offset(), is(1L)); + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE); + Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(1L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); } }