[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

2016-09-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507741#comment-15507741
 ] 

Stig Rohde Døssing commented on STORM-2106:
---

My pleasure :)

> Storm Kafka Client is paused while failed tuples are replayed
> -
>
> Key: STORM-2106
> URL: https://issues.apache.org/jira/browse/STORM-2106
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

2016-09-20 Thread Jeff Fenchel (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507712#comment-15507712
 ] 

Jeff Fenchel commented on STORM-2106:
-

[~Srdo] Thank you so much for the explanation here! You are 100% correct. As it 
turns out, I did this to myself in hacking around some exceptions caused by 
rebalance. I'll close the ticket. Thanks again for your time here!

> Storm Kafka Client is paused while failed tuples are replayed
> -
>
> Key: STORM-2106
> URL: https://issues.apache.org/jira/browse/STORM-2106
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

2016-09-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507662#comment-15507662
 ] 

Stig Rohde Døssing commented on STORM-2106:
---

[~jfenc91] I don't think it works that way. doSeekRetriableTopicPartitions only 
seeks to the committed offset if retryService.retriableTopicPartitions returns 
x's TopicPartition (since the for-loop otherwise doesn't include x's 
TopicPartition), which it only does if there are failed tuples ready for retry 
on that TopicPartition. When x is ready for retry, 
retryService.retriableTopicPartitions will return x's TopicPartition, the 
consumer seeks to x and the consumer is polled. x should then be emitted. When 
x is emitted, emitTupleIfNotEmitted removes the messageId from retryService, 
which should prevent retryService.retriableTopicPartitions from returning x's 
TopicPartition until x (or some other message on x's TopicPartition) fails 
again.

While x is still processing the spout shouldn't seek back to x (and doesn't as 
far as I can tell). Is it possible that you were limited by something else, 
like maxSpoutPending, retrying a large number of tuples or partition 
reassignments/worker restarts? 

See 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294
 and 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java#L164

> Storm Kafka Client is paused while failed tuples are replayed
> -
>
> Key: STORM-2106
> URL: https://issues.apache.org/jira/browse/STORM-2106
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

2016-09-20 Thread Jeff Fenchel (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507512#comment-15507512
 ] 

Jeff Fenchel commented on STORM-2106:
-

[~Srdo] I don't think it quite works that way. In the case you provided, I 
agree with you that x+1..y will be skipped while x is processing. However in 
doSeekRetriableTopicPartitions we seek to the next offset that may be committed 
based on acked messages. So in this case, since x is still processing it seeks 
back to x and filters out all of x-y again because x is processing and the rest 
are acked. The result is nothing gets emitted until x finishes processing. 

For me the batch size kafka provide (~300) was << maxUncommittedOffsets which 
severely crippled the throughput of my topology. 

I agree that we should poll kafka again at y and that is why I made this 
ticket. 


Code snippet: 
{code}
private void doSeekRetriableTopicPartitions() {
final Set retriableTopicPartitions = 
retryService.retriableTopicPartitions();

for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);
// Seek to last committed offset
}
}
}
{code}




> Storm Kafka Client is paused while failed tuples are replayed
> -
>
> Key: STORM-2106
> URL: https://issues.apache.org/jira/browse/STORM-2106
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

2016-09-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507304#comment-15507304
 ] 

Stig Rohde Døssing commented on STORM-2106:
---

Are you sure this is an issue? I get that the spout will restart at the failed 
offset if it crashes or partitions are reassigned and another spout has to 
restart at the last committed offset, but as far as I can tell it should 
progress past the failed tuple when it's reemitted or the retry service decides 
to delay it. doSeekRetriableTopicPartitions only seeks on those partitions that 
have failed tuples ready to retry (i.e. whose retry delays have expired).

If you have a sequence of offsets x...y and x has failed, x-1 was committed and 
everything up to y has been acked, doSeekRetriableTopicPartitions should seek 
to x and retry it. The offsets between x and y will then be skipped by 
emitTupleIfNotEmitted because they've already been acked. On the next call to 
nextTuple after skipping ]x...y[, the spout should poll Kafka again at y 
because waitingToEmit is empty (x was emitted and the others were already 
acked). If x gets acked there's no problem. If x fails again and the retry 
service decides to delay it, doSeekRetriableTopicPartitions won't seek back to 
x until the backoff expires. In the meantime new offsets get processed. When 
x's retry delay expires, the spout will just seek back to x, emit it again and 
skip past ]x...y...z[ (z being the last message it managed to ack before 
seeking back to x).

There are a few other factors in play here like maxUncommittedOffsets, but I 
don't really see the issue you're describing.

> Storm Kafka Client is paused while failed tuples are replayed
> -
>
> Key: STORM-2106
> URL: https://issues.apache.org/jira/browse/STORM-2106
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)