[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-09 Thread Prasanna Ranganathan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045388#comment-16045388
 ] 

Prasanna Ranganathan commented on STORM-2343:
-

{quote}
Instead of pausing nonretriable partitions, we could instead keep track of 
numUncommittedOffsets per partition, so we can pause only those partitions that 
have no retriable tuples and are at the maxUncommittedOffsets limit. That way 
unhealthy partitions can't block healthy partitions, and we avoid the case 
described above where a failed tuple on one partition causes new (limit 
breaking) tuples to be emitted on a different partition.
{quote}

Yes, this is what we should do. Pause partitions only in the above scenario. In 
the special case of a spout handling only one partition, we can simply skip 
poll() instead of pausing even when this condition is met.

Noted your update on kafka consumer pause being locally managed. Makes sense.

STORM-2542 is interesting. Will comment on that in that JIRA once I catch up on 
it.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-09 Thread Roshan Naik (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045053#comment-16045053
 ] 

Roshan Naik commented on STORM-2359:


{quote} The current implementation has a pending map in both the acker and the 
spout, which rotate every topology.message.timeout.secs.{quote}
Need to see if we can eliminate the timeout logic from the spout and have it 
only the ACKer (i can think of some issues). If we must retain that logic in 
the spouts, the timeout value that it operates on (full tuple tree processing) 
would have to be separated from the timeout value that the ACKER uses to track 
progress between stages. 

{quote}The spout then reemitted the expired tuples, and they got into the queue 
behind their own expired instances. {quote}

Perfect example indeed. The motivation of this jira is to try to 
eliminate/mitigate triggering of timeouts for queued/inflight tuples that are 
not lost. The only time we need timeouts/remits to be triggered is when 
one/more tuples in the tuple tree are truly lost. *I think* that can only 
happen if a worker/bolt/spout died. So the case your describing should not 
happen if we solve this problem correctly.
 
IMO, the ideal solution would have the spouts remit only the specific tuples 
whose tuple trees had some loss due to a worker going down. I am not yet 
certain whether/not this initial idea described in the doc is the optimal 
solution. Perhaps a better way is to trigger such re-emits only if a 
worker/bolt/spout went down.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-09 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045046#comment-16045046
 ] 

Stig Rohde Døssing commented on STORM-2343:
---

Wait, I think this is fixable. Instead of pausing nonretriable partitions, we 
could instead keep track of numUncommittedOffsets per partition, so we can 
pause only those partitions that have no retriable tuples and are at the 
maxUncommittedOffsets limit. That way unhealthy partitions can't block healthy 
partitions, and we avoid the case described above where a failed tuple on one 
partition causes new (limit breaking) tuples to be emitted on a different 
partition.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-09 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044979#comment-16044979
 ] 

Stig Rohde Døssing commented on STORM-2343:
---

{quote}
We need to confirm the behaviour in this scenario and handle it accordingly in 
the spout.
{quote}
As far as I know pausing/resuming is a purely local operation for the 
KafkaConsumer. It just causes it to not fetch records for the paused 
partitions. The paused state is not preserved if the client crashes (because 
the local state is then lost), or if the consumers rebalance (see 
https://github.com/apache/kafka/blob/2af4dd8653dd6717cca1630a57b2835a2698a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L49).
 I don't think we need to worry about this. Also I'm pushing for us dropping 
support for Kafka-managed subscriptions here 
https://github.com/apache/storm/pull/2151 so I'm hoping this ends up being 
irrelevant.

{quote}
I am assuming we need NOT pause partition 0 in solution #3 for the scenario 
described
{quote}
The reason we want to pause is that when the spout is at (or past) the 
maxUncommittedOffsets limit, it should only emit retries or a very limited 
number of new tuples. In the example I gave above, if we don't pause partition 
0, then the poll triggered to fetch offset 99 on partition 1 might just return 
a full batch of messages from partition 0. There is no guarantee that the poll 
will even contain the retriable tuple, so we might do this multiple times. If 
there were 10 additional partitions we might get full polls for any of those as 
well before we get the retriable tuple.

If we don't pause we can't really enforce maxUncommittedOffsets as far as I can 
tell. 

I agree that if there's only one partition it should never be paused. The rest 
of your outline seems right to me as well.

{quote}
For Storm spout, Kafka Partitions enable scaling and isolation among other 
things. It is not acceptable for a 'healthy' partition to be blocked by an 
'unhealthy' one
{quote}
I don't think the healthy partitions will be blocked for very long. Each poll 
where we pause will reemit (or discard pending the fix for STORM-2546) some 
retriable tuples. The only way the spout should be completely blocked due to 
retries is if the user hasn't configured a retry limit and the tuples fail 
consistently.

I agree that it isn't ideal, but I don't see a way to have a limit like 
maxUncommittedOffsets be properly enforced without pausing (and thus blocking) 
the healthy partitions when we get in this state where maxUncommittedOffsets is 
violated.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-09 Thread Prasanna Ranganathan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044883#comment-16044883
 ] 

Prasanna Ranganathan commented on STORM-2343:
-

Thanks a ton for the awesome writeup of the issue and potential solutions. My 
thoughts so far around potential solutions are mostly in-line with yours. I 
wanted but did not get around to confirming the behaviour of Kafka Broker / 
Group Coordinator when the client node that paused a partition crashes OR 
leaves group OR suffers network partition before calling resume() for that 
partition. We need to confirm the behaviour in this scenario and handle it 
accordingly in the spout.

About Solution #3:
I am assuming we need NOT pause partition 0 in solution #3 for the scenario 
described. This solution, to me, is basically extending the current logic 
around maxUncommittedOffsets to every partition in the spout. If a spout 
handles only one partition then we would never really pause it. We simply stop 
calling poll if a partition reaches maxUncommittedOffsets without any failed 
tuples. Otherwise the partition should continue to be polled. The logic should 
then simply take care of seeking to the appropriate offset depending on whether 
retriable tuples are present.

Agree completely that the choice is between #2 and #3. Am leaning toward #3 for 
the following reasons:
- Partition is a fundamental building block / concept in Kafka and this 
solution fits neatly into it and extends it
- For Storm spout, Kafka Partitions enable scaling and isolation among other 
things. It is not acceptable for a 'healthy' partition to be blocked by an 
'unhealthy' one
- We do a fair bit of partition-specific bookkeeping in OffsetManager already. 
More bookkeeping is a fair price to pay given the reward on offer.. :-)

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (STORM-2525) Fix flaky integration tests

2017-06-09 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-2525.
---
Resolution: Fixed

> Fix flaky integration tests
> ---
>
> Key: STORM-2525
> URL: https://issues.apache.org/jira/browse/STORM-2525
> Project: Apache Storm
>  Issue Type: Bug
>  Components: integration-test
>Affects Versions: 2.0.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
> Fix For: 2.0.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The integration tests fail fairly often, e.g. 
> https://travis-ci.org/apache/storm/jobs/233690012. The tests should be fixed 
> so they're more reliable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (STORM-2525) Fix flaky integration tests

2017-06-09 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-2525:
--
Fix Version/s: 2.0.0

> Fix flaky integration tests
> ---
>
> Key: STORM-2525
> URL: https://issues.apache.org/jira/browse/STORM-2525
> Project: Apache Storm
>  Issue Type: Bug
>  Components: integration-test
>Affects Versions: 2.0.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
> Fix For: 2.0.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The integration tests fail fairly often, e.g. 
> https://travis-ci.org/apache/storm/jobs/233690012. The tests should be fixed 
> so they're more reliable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-09 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044785#comment-16044785
 ] 

Stig Rohde Døssing commented on STORM-2343:
---

[~ranganp] I spent a while thinking about this, but it seems to me to be 
something where there's a lot of corner cases to consider. Here's my best 
effort.
Regarding fixing STORM-2546:
The only way to know that a tuple has been deleted from Kafka is to try polling 
for it. We can know for sure that a failed tuple has been deleted if we seek to 
the failed tuple's offset (or earlier) on the relevant partition and poll, and 
we then encounter a tuple that has a higher offset than the failed tuple on 
that partition earlier in the result set.

For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and 
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be 
sure that offset 0...5 are deleted, because otherwise they would have been 
returned in the poll. Offset 7 cannot be removed from the spout because we 
can't be sure that it was deleted, the consumer may just have received too few 
messages.

I believe we can also conclude that offsets have been removed if we seek to 
their offsets, poll and receive an empty result. I'm not entirely sure about 
this, but I don't think the consumer will return empty polls if there are more 
messages to consume.

I think we can use this method to remove failed, deleted tuples from the offset 
manager. When we do a poll, we examine the retriable tuples for each partition. 
For each partition where we received tuples, we compare the earliest received 
tuple to the retriable tuples for that partition. If the offset of a given 
retriable tuple is lower than the offset of the earliest received tuple, then 
the retriable tuple must have been deleted. 

About this issue:
The fact that failed tuples can be removed from Kafka before they can be 
retried is something I overlooked in what I wrote earlier. I think either 
solution can deal with it though.

One correction to what I wrote earlier regarding emitTupleIfNotEmitted 
filtering btw: We'll should also pause partitions in this solution IMO. 
Otherwise it is possible (even likely if there are few retriable partitions) to 
allow poll due to retriable tuples, and get no retriable tuples from the poll, 
in which case we'll discard all the messages and try again later. I think it 
would make that solution unacceptably wasteful (we'd risk multiple useless 
polls for unrelated partitions every time we have to retry a tuple while at the 
maxUncommittedOffsets limit), so we should pause nonretriable partitions.

The solutions I see to this issue right now are:

* Don't enforce maxUncommittedOffsets if there are retriable tuples at all. 
This is simple to implement, but I don't really have a good feeling for what 
the likelihood is that maxUncommittedOffsets will be exceeded by "too much".

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
Partition 1, message 97 is retriable
The spout seeks to message 97 and polls
It gets back offsets 99, 101, 103 and potentially 7 new tuples. Say the new 
tuples are in the range 104-110.
If any of 104-110 become retriable, the spout may emit another set of 9 
(maxPollRecords - 1) tuples.
This can repeat for each newly emitted set. The likelihood of this happening in 
real life is unclear to me.

* Enforce maxUncommittedOffsets globally by always allowing poll if there are 
retriable tuples, pause any non-retriable partitions if the spout has passed 
the maxUncommittedOffsets limit, and filter out fresh tuples from the poll 
result. This should work to enforce maxUncommittedOffsets. In order to avoid 
dropping messages, the consumer has to seek back to the earliest offset on each 
partition that was filtered out by this new check. As far as I can tell we 
won't be increasing the number of discarded tuples by an unreasonable number as 
long as we pause non-retriable partitions. This is because the spout will 
currently discard any acked or already emitted offset it receives in a poll. 
This solution will additionally discard those that are entirely new, which 
means they have to have a higher offset than the newest currently emitted tuple 
on the retried partition. It seems (assuming tuple failures are evenly 
distributed in the emitte set) more likely to me that most retries will happen 
somewhere "in the middle" of the currently emitted tuples. 

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emit