[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed
[ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507741#comment-15507741 ] Stig Rohde Døssing commented on STORM-2106: --- My pleasure :) > Storm Kafka Client is paused while failed tuples are replayed > - > > Key: STORM-2106 > URL: https://issues.apache.org/jira/browse/STORM-2106 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > > With the changes in STORM-2087, the kafka 10 spout will limited to emitting > tuples that are within the poll() size for kafka. This means that if the > first tuple in a batch from kafka is failed, the spout will not emit more > than the size of the batch from kafka until the tuple is either processed > successfully or given up on. This behavior is exacerbated by the exponential > backoff retry policy. > There probably needs to be bookkeeping for the next emittable offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed
[ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507712#comment-15507712 ] Jeff Fenchel commented on STORM-2106: - [~Srdo] Thank you so much for the explanation here! You are 100% correct. As it turns out, I did this to myself in hacking around some exceptions caused by rebalance. I'll close the ticket. Thanks again for your time here! > Storm Kafka Client is paused while failed tuples are replayed > - > > Key: STORM-2106 > URL: https://issues.apache.org/jira/browse/STORM-2106 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > > With the changes in STORM-2087, the kafka 10 spout will limited to emitting > tuples that are within the poll() size for kafka. This means that if the > first tuple in a batch from kafka is failed, the spout will not emit more > than the size of the batch from kafka until the tuple is either processed > successfully or given up on. This behavior is exacerbated by the exponential > backoff retry policy. > There probably needs to be bookkeeping for the next emittable offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed
[ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507662#comment-15507662 ] Stig Rohde Døssing commented on STORM-2106: --- [~jfenc91] I don't think it works that way. doSeekRetriableTopicPartitions only seeks to the committed offset if retryService.retriableTopicPartitions returns x's TopicPartition (since the for-loop otherwise doesn't include x's TopicPartition), which it only does if there are failed tuples ready for retry on that TopicPartition. When x is ready for retry, retryService.retriableTopicPartitions will return x's TopicPartition, the consumer seeks to x and the consumer is polled. x should then be emitted. When x is emitted, emitTupleIfNotEmitted removes the messageId from retryService, which should prevent retryService.retriableTopicPartitions from returning x's TopicPartition until x (or some other message on x's TopicPartition) fails again. While x is still processing the spout shouldn't seek back to x (and doesn't as far as I can tell). Is it possible that you were limited by something else, like maxSpoutPending, retrying a large number of tuples or partition reassignments/worker restarts? See https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 and https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java#L164 > Storm Kafka Client is paused while failed tuples are replayed > - > > Key: STORM-2106 > URL: https://issues.apache.org/jira/browse/STORM-2106 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > > With the changes in STORM-2087, the kafka 10 spout will limited to emitting > tuples that are within the poll() size for kafka. This means that if the > first tuple in a batch from kafka is failed, the spout will not emit more > than the size of the batch from kafka until the tuple is either processed > successfully or given up on. This behavior is exacerbated by the exponential > backoff retry policy. > There probably needs to be bookkeeping for the next emittable offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed
[ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507512#comment-15507512 ] Jeff Fenchel commented on STORM-2106: - [~Srdo] I don't think it quite works that way. In the case you provided, I agree with you that x+1..y will be skipped while x is processing. However in doSeekRetriableTopicPartitions we seek to the next offset that may be committed based on acked messages. So in this case, since x is still processing it seeks back to x and filters out all of x-y again because x is processing and the rest are acked. The result is nothing gets emitted until x finishes processing. For me the batch size kafka provide (~300) was << maxUncommittedOffsets which severely crippled the throughput of my topology. I agree that we should poll kafka again at y and that is why I made this ticket. Code snippet: {code} private void doSeekRetriableTopicPartitions() { final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset } } } {code} > Storm Kafka Client is paused while failed tuples are replayed > - > > Key: STORM-2106 > URL: https://issues.apache.org/jira/browse/STORM-2106 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > > With the changes in STORM-2087, the kafka 10 spout will limited to emitting > tuples that are within the poll() size for kafka. This means that if the > first tuple in a batch from kafka is failed, the spout will not emit more > than the size of the batch from kafka until the tuple is either processed > successfully or given up on. This behavior is exacerbated by the exponential > backoff retry policy. > There probably needs to be bookkeeping for the next emittable offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed
[ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507304#comment-15507304 ] Stig Rohde Døssing commented on STORM-2106: --- Are you sure this is an issue? I get that the spout will restart at the failed offset if it crashes or partitions are reassigned and another spout has to restart at the last committed offset, but as far as I can tell it should progress past the failed tuple when it's reemitted or the retry service decides to delay it. doSeekRetriableTopicPartitions only seeks on those partitions that have failed tuples ready to retry (i.e. whose retry delays have expired). If you have a sequence of offsets x...y and x has failed, x-1 was committed and everything up to y has been acked, doSeekRetriableTopicPartitions should seek to x and retry it. The offsets between x and y will then be skipped by emitTupleIfNotEmitted because they've already been acked. On the next call to nextTuple after skipping ]x...y[, the spout should poll Kafka again at y because waitingToEmit is empty (x was emitted and the others were already acked). If x gets acked there's no problem. If x fails again and the retry service decides to delay it, doSeekRetriableTopicPartitions won't seek back to x until the backoff expires. In the meantime new offsets get processed. When x's retry delay expires, the spout will just seek back to x, emit it again and skip past ]x...y...z[ (z being the last message it managed to ack before seeking back to x). There are a few other factors in play here like maxUncommittedOffsets, but I don't really see the issue you're describing. > Storm Kafka Client is paused while failed tuples are replayed > - > > Key: STORM-2106 > URL: https://issues.apache.org/jira/browse/STORM-2106 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > > With the changes in STORM-2087, the kafka 10 spout will limited to emitting > tuples that are within the poll() size for kafka. This means that if the > first tuple in a batch from kafka is failed, the spout will not emit more > than the size of the batch from kafka until the tuple is either processed > successfully or given up on. This behavior is exacerbated by the exponential > backoff retry policy. > There probably needs to be bookkeeping for the next emittable offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)