[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
[ https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045388#comment-16045388 ] Prasanna Ranganathan commented on STORM-2343: - {quote} Instead of pausing nonretriable partitions, we could instead keep track of numUncommittedOffsets per partition, so we can pause only those partitions that have no retriable tuples and are at the maxUncommittedOffsets limit. That way unhealthy partitions can't block healthy partitions, and we avoid the case described above where a failed tuple on one partition causes new (limit breaking) tuples to be emitted on a different partition. {quote} Yes, this is what we should do. Pause partitions only in the above scenario. In the special case of a spout handling only one partition, we can simply skip poll() instead of pausing even when this condition is met. Noted your update on kafka consumer pause being locally managed. Makes sense. STORM-2542 is interesting. Will comment on that in that JIRA once I catch up on it. > New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets > tuples fail at once > --- > > Key: STORM-2343 > URL: https://issues.apache.org/jira/browse/STORM-2343 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Critical > Fix For: 2.0.0, 1.1.1 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > It doesn't look like the spout is respecting maxUncommittedOffsets in all > cases. If the underlying consumer returns more records in a call to poll() > than maxUncommittedOffsets, they will all be added to waitingToEmit. Since > poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty > likely to happen with low maxUncommittedOffsets. > The spout only checks for tuples to retry if it decides to poll, and it only > decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since > maxUncommittedOffsets isn't being respected when retrieving or emitting > records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. > If more than maxUncommittedOffsets messages fail, this can cause the spout to > stop polling entirely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045053#comment-16045053 ] Roshan Naik commented on STORM-2359: {quote} The current implementation has a pending map in both the acker and the spout, which rotate every topology.message.timeout.secs.{quote} Need to see if we can eliminate the timeout logic from the spout and have it only the ACKer (i can think of some issues). If we must retain that logic in the spouts, the timeout value that it operates on (full tuple tree processing) would have to be separated from the timeout value that the ACKER uses to track progress between stages. {quote}The spout then reemitted the expired tuples, and they got into the queue behind their own expired instances. {quote} Perfect example indeed. The motivation of this jira is to try to eliminate/mitigate triggering of timeouts for queued/inflight tuples that are not lost. The only time we need timeouts/remits to be triggered is when one/more tuples in the tuple tree are truly lost. *I think* that can only happen if a worker/bolt/spout died. So the case your describing should not happen if we solve this problem correctly. IMO, the ideal solution would have the spouts remit only the specific tuples whose tuple trees had some loss due to a worker going down. I am not yet certain whether/not this initial idea described in the doc is the optimal solution. Perhaps a better way is to trigger such re-emits only if a worker/bolt/spout went down. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
[ https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045046#comment-16045046 ] Stig Rohde Døssing commented on STORM-2343: --- Wait, I think this is fixable. Instead of pausing nonretriable partitions, we could instead keep track of numUncommittedOffsets per partition, so we can pause only those partitions that have no retriable tuples and are at the maxUncommittedOffsets limit. That way unhealthy partitions can't block healthy partitions, and we avoid the case described above where a failed tuple on one partition causes new (limit breaking) tuples to be emitted on a different partition. > New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets > tuples fail at once > --- > > Key: STORM-2343 > URL: https://issues.apache.org/jira/browse/STORM-2343 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Critical > Fix For: 2.0.0, 1.1.1 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > It doesn't look like the spout is respecting maxUncommittedOffsets in all > cases. If the underlying consumer returns more records in a call to poll() > than maxUncommittedOffsets, they will all be added to waitingToEmit. Since > poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty > likely to happen with low maxUncommittedOffsets. > The spout only checks for tuples to retry if it decides to poll, and it only > decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since > maxUncommittedOffsets isn't being respected when retrieving or emitting > records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. > If more than maxUncommittedOffsets messages fail, this can cause the spout to > stop polling entirely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
[ https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044979#comment-16044979 ] Stig Rohde Døssing commented on STORM-2343: --- {quote} We need to confirm the behaviour in this scenario and handle it accordingly in the spout. {quote} As far as I know pausing/resuming is a purely local operation for the KafkaConsumer. It just causes it to not fetch records for the paused partitions. The paused state is not preserved if the client crashes (because the local state is then lost), or if the consumers rebalance (see https://github.com/apache/kafka/blob/2af4dd8653dd6717cca1630a57b2835a2698a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L49). I don't think we need to worry about this. Also I'm pushing for us dropping support for Kafka-managed subscriptions here https://github.com/apache/storm/pull/2151 so I'm hoping this ends up being irrelevant. {quote} I am assuming we need NOT pause partition 0 in solution #3 for the scenario described {quote} The reason we want to pause is that when the spout is at (or past) the maxUncommittedOffsets limit, it should only emit retries or a very limited number of new tuples. In the example I gave above, if we don't pause partition 0, then the poll triggered to fetch offset 99 on partition 1 might just return a full batch of messages from partition 0. There is no guarantee that the poll will even contain the retriable tuple, so we might do this multiple times. If there were 10 additional partitions we might get full polls for any of those as well before we get the retriable tuple. If we don't pause we can't really enforce maxUncommittedOffsets as far as I can tell. I agree that if there's only one partition it should never be paused. The rest of your outline seems right to me as well. {quote} For Storm spout, Kafka Partitions enable scaling and isolation among other things. It is not acceptable for a 'healthy' partition to be blocked by an 'unhealthy' one {quote} I don't think the healthy partitions will be blocked for very long. Each poll where we pause will reemit (or discard pending the fix for STORM-2546) some retriable tuples. The only way the spout should be completely blocked due to retries is if the user hasn't configured a retry limit and the tuples fail consistently. I agree that it isn't ideal, but I don't see a way to have a limit like maxUncommittedOffsets be properly enforced without pausing (and thus blocking) the healthy partitions when we get in this state where maxUncommittedOffsets is violated. > New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets > tuples fail at once > --- > > Key: STORM-2343 > URL: https://issues.apache.org/jira/browse/STORM-2343 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Critical > Fix For: 2.0.0, 1.1.1 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > It doesn't look like the spout is respecting maxUncommittedOffsets in all > cases. If the underlying consumer returns more records in a call to poll() > than maxUncommittedOffsets, they will all be added to waitingToEmit. Since > poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty > likely to happen with low maxUncommittedOffsets. > The spout only checks for tuples to retry if it decides to poll, and it only > decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since > maxUncommittedOffsets isn't being respected when retrieving or emitting > records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. > If more than maxUncommittedOffsets messages fail, this can cause the spout to > stop polling entirely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
[ https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044883#comment-16044883 ] Prasanna Ranganathan commented on STORM-2343: - Thanks a ton for the awesome writeup of the issue and potential solutions. My thoughts so far around potential solutions are mostly in-line with yours. I wanted but did not get around to confirming the behaviour of Kafka Broker / Group Coordinator when the client node that paused a partition crashes OR leaves group OR suffers network partition before calling resume() for that partition. We need to confirm the behaviour in this scenario and handle it accordingly in the spout. About Solution #3: I am assuming we need NOT pause partition 0 in solution #3 for the scenario described. This solution, to me, is basically extending the current logic around maxUncommittedOffsets to every partition in the spout. If a spout handles only one partition then we would never really pause it. We simply stop calling poll if a partition reaches maxUncommittedOffsets without any failed tuples. Otherwise the partition should continue to be polled. The logic should then simply take care of seeking to the appropriate offset depending on whether retriable tuples are present. Agree completely that the choice is between #2 and #3. Am leaning toward #3 for the following reasons: - Partition is a fundamental building block / concept in Kafka and this solution fits neatly into it and extends it - For Storm spout, Kafka Partitions enable scaling and isolation among other things. It is not acceptable for a 'healthy' partition to be blocked by an 'unhealthy' one - We do a fair bit of partition-specific bookkeeping in OffsetManager already. More bookkeeping is a fair price to pay given the reward on offer.. :-) > New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets > tuples fail at once > --- > > Key: STORM-2343 > URL: https://issues.apache.org/jira/browse/STORM-2343 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Critical > Fix For: 2.0.0, 1.1.1 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > It doesn't look like the spout is respecting maxUncommittedOffsets in all > cases. If the underlying consumer returns more records in a call to poll() > than maxUncommittedOffsets, they will all be added to waitingToEmit. Since > poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty > likely to happen with low maxUncommittedOffsets. > The spout only checks for tuples to retry if it decides to poll, and it only > decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since > maxUncommittedOffsets isn't being respected when retrieving or emitting > records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. > If more than maxUncommittedOffsets messages fail, this can cause the spout to > stop polling entirely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (STORM-2525) Fix flaky integration tests
[ https://issues.apache.org/jira/browse/STORM-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing resolved STORM-2525. --- Resolution: Fixed > Fix flaky integration tests > --- > > Key: STORM-2525 > URL: https://issues.apache.org/jira/browse/STORM-2525 > Project: Apache Storm > Issue Type: Bug > Components: integration-test >Affects Versions: 2.0.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing > Fix For: 2.0.0 > > Time Spent: 1h > Remaining Estimate: 0h > > The integration tests fail fairly often, e.g. > https://travis-ci.org/apache/storm/jobs/233690012. The tests should be fixed > so they're more reliable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (STORM-2525) Fix flaky integration tests
[ https://issues.apache.org/jira/browse/STORM-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-2525: -- Fix Version/s: 2.0.0 > Fix flaky integration tests > --- > > Key: STORM-2525 > URL: https://issues.apache.org/jira/browse/STORM-2525 > Project: Apache Storm > Issue Type: Bug > Components: integration-test >Affects Versions: 2.0.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing > Fix For: 2.0.0 > > Time Spent: 1h > Remaining Estimate: 0h > > The integration tests fail fairly often, e.g. > https://travis-ci.org/apache/storm/jobs/233690012. The tests should be fixed > so they're more reliable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
[ https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044785#comment-16044785 ] Stig Rohde Døssing commented on STORM-2343: --- [~ranganp] I spent a while thinking about this, but it seems to me to be something where there's a lot of corner cases to consider. Here's my best effort. Regarding fixing STORM-2546: The only way to know that a tuple has been deleted from Kafka is to try polling for it. We can know for sure that a failed tuple has been deleted if we seek to the failed tuple's offset (or earlier) on the relevant partition and poll, and we then encounter a tuple that has a higher offset than the failed tuple on that partition earlier in the result set. For instance: Offset 0...5 have failed and also been compacted away. Offset 6 has failed and is present, offset 7 has failed and is not present. We seek to offset 0 for the partition. If we then see that the first message in the poll result is offset 6, we can be sure that offset 0...5 are deleted, because otherwise they would have been returned in the poll. Offset 7 cannot be removed from the spout because we can't be sure that it was deleted, the consumer may just have received too few messages. I believe we can also conclude that offsets have been removed if we seek to their offsets, poll and receive an empty result. I'm not entirely sure about this, but I don't think the consumer will return empty polls if there are more messages to consume. I think we can use this method to remove failed, deleted tuples from the offset manager. When we do a poll, we examine the retriable tuples for each partition. For each partition where we received tuples, we compare the earliest received tuple to the retriable tuples for that partition. If the offset of a given retriable tuple is lower than the offset of the earliest received tuple, then the retriable tuple must have been deleted. About this issue: The fact that failed tuples can be removed from Kafka before they can be retried is something I overlooked in what I wrote earlier. I think either solution can deal with it though. One correction to what I wrote earlier regarding emitTupleIfNotEmitted filtering btw: We'll should also pause partitions in this solution IMO. Otherwise it is possible (even likely if there are few retriable partitions) to allow poll due to retriable tuples, and get no retriable tuples from the poll, in which case we'll discard all the messages and try again later. I think it would make that solution unacceptably wasteful (we'd risk multiple useless polls for unrelated partitions every time we have to retry a tuple while at the maxUncommittedOffsets limit), so we should pause nonretriable partitions. The solutions I see to this issue right now are: * Don't enforce maxUncommittedOffsets if there are retriable tuples at all. This is simple to implement, but I don't really have a good feeling for what the likelihood is that maxUncommittedOffsets will be exceeded by "too much". Example of this functionality: MaxUncommittedOffsets is 100 MaxPollRecords is 10 Committed offset for partition 0 and 1 is 0. Partition 0 has emitted 0 Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away) Partition 1, message 97 is retriable The spout seeks to message 97 and polls It gets back offsets 99, 101, 103 and potentially 7 new tuples. Say the new tuples are in the range 104-110. If any of 104-110 become retriable, the spout may emit another set of 9 (maxPollRecords - 1) tuples. This can repeat for each newly emitted set. The likelihood of this happening in real life is unclear to me. * Enforce maxUncommittedOffsets globally by always allowing poll if there are retriable tuples, pause any non-retriable partitions if the spout has passed the maxUncommittedOffsets limit, and filter out fresh tuples from the poll result. This should work to enforce maxUncommittedOffsets. In order to avoid dropping messages, the consumer has to seek back to the earliest offset on each partition that was filtered out by this new check. As far as I can tell we won't be increasing the number of discarded tuples by an unreasonable number as long as we pause non-retriable partitions. This is because the spout will currently discard any acked or already emitted offset it receives in a poll. This solution will additionally discard those that are entirely new, which means they have to have a higher offset than the newest currently emitted tuple on the retried partition. It seems (assuming tuple failures are evenly distributed in the emitte set) more likely to me that most retries will happen somewhere "in the middle" of the currently emitted tuples. Example of this functionality: MaxUncommittedOffsets is 100 MaxPollRecords is 10 Committed offset for partition 0 and 1 is 0. Partition 0 has emitted 0 Partition 1 has emit