Repository: storm Updated Branches: refs/heads/1.x-branch 4e1e62667 -> d5ead6681
STORM-3046: Ensure KafkaTridentSpoutEmitter handles empty batches correctly when they occur at the beginning of the stream Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e63f5528 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e63f5528 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e63f5528 Branch: refs/heads/1.x-branch Commit: e63f5528bc2f807578560ec3f783f326e1aec2a0 Parents: 554ff4a Author: Stig Rohde Døssing <s...@apache.org> Authored: Sat Apr 28 13:01:44 2018 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Tue May 29 21:09:46 2018 +0200 ---------------------------------------------------------------------- .../spout/trident/KafkaTridentSpoutEmitter.java | 54 ++++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e63f5528/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 3b4aa4b..167cbab 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -59,11 +59,11 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident // Kafka private final KafkaConsumer<K, V> kafkaConsumer; - + // Bookkeeping private final KafkaTridentSpoutManager<K, V> kafkaManager; - // set of topic-partitions for which first poll has already occurred, and the first polled txid - private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); + // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at. + private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<>(); // Declare some KafkaTridentSpoutManager references for convenience private final long pollTimeoutMs; @@ -86,7 +86,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident this.topologyContext = topologyContext; this.refreshSubscriptionTimer = refreshSubscriptionTimer; this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator(); - + final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); @@ -124,7 +124,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident // pause other topic-partitions to only poll from current topic-partition pausedTopicPartitions = pauseTopicPartitions(currBatchTp); - seek(currBatchTp, lastBatchMeta, tx.getTransactionId()); + seek(currBatchTp, lastBatchMeta); // poll if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { @@ -162,16 +162,17 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition. * Otherwise the next offset will be one past the last batch, based on lastBatchMeta. * - * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic), - * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition. - * In the second case, there are either no previous transactions, or the MBC is still committing them - * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null. - * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance. + * <p>lastBatchMeta should only be null in the following cases: + * <ul> + * <li>This is the first batch for this partition</li> + * <li>This is a replay of the first batch for this partition</li> + * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li> + * </ul> * * @return the offset of the next fetch */ - private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) { - if (isFirstPoll(tp, transactionId)) { + private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { + if (isFirstPoll(tp)) { if (firstPollOffsetStrategy == EARLIEST) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); kafkaConsumer.seekToBeginning(Collections.singleton(tp)); @@ -188,10 +189,20 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp); kafkaConsumer.seekToEnd(Collections.singleton(tp)); } - firstPollTransaction.put(tp, transactionId); - } else { + tpToFirstSeekOffset.put(tp, kafkaConsumer.position(tp)); + } else if (lastBatchMeta != null) { kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); + } else { + /* + * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. + * This is either a replay of the first batch for this partition, or all previous batches were empty, + * otherwise last batch meta could not be null. Use the offset the consumer started at. + */ + long initialFetchOffset = tpToFirstSeekOffset.get(tp); + kafkaConsumer.seek(tp, initialFetchOffset); + LOG.debug("First poll for topic partition [{}], no last batch metadata present." + + " Using stored initial fetch offset [{}]", tp, initialFetchOffset); } final long fetchOffset = kafkaConsumer.position(tp); @@ -199,9 +210,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident return fetchOffset; } - private boolean isFirstPoll(TopicPartition tp, long txid) { - // The first poll is either the "real" first transaction, or a replay of the first transaction - return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid; + private boolean isFirstPoll(TopicPartition tp) { + return !tpToFirstSeekOffset.containsKey(tp); } // returns paused topic-partitions. @@ -248,14 +258,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId); return taskTps; } - + private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) { final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size()); if (tps != null) { - for (TopicPartition tp : tps) { - LOG.trace("Added topic-partition [{}]", tp); - kttp.add(new KafkaTridentSpoutTopicPartition(tp)); - } + for (TopicPartition tp : tps) { + LOG.trace("Added topic-partition [{}]", tp); + kttp.add(new KafkaTridentSpoutTopicPartition(tp)); + } } return kttp; }