GitHub user JasonMWhite opened a pull request:

    https://github.com/apache/spark/pull/10089

    [SPARK-12073] [Streaming] backpressure rate controller consumes events 
preferentially from lagg…

    …ing partitions
    
    I'm pretty sure this is the reason we couldn't easily recover from an 
unbalanced Kafka partition under heavy load when using backpressure.
    
    `maxMessagesPerPartition` calculates an appropriate limit for the message 
rate from all partitions, and then divides by the number of partitions to 
determine how many messages to retrieve per partition. The problem with this 
approach is that when one partition is behind by millions of records (due to 
random Kafka issues), but the rate estimator calculates only 100k total 
messages can be retrieved, each partition (out of say 32) only retrieves max 
100k/32=3125 messages.
    
    This PR (still needing a test) determines a per-partition desired message 
count by using the current lag for each partition to preferentially weight the 
total message limit among the partitions. In this situation, if each partition 
gets 1k messages, but 1 partition starts 1M behind, then the total number of 
messages to retrieve is (32 * 1k + 1M) = 1032000 messages, of which the one 
partition needs 1001000. So, it gets (1001000 / 1032000) = 97% of the 100k 
messages, and the other 31 partitions share the remaining 3%.
    
    Assuming all of 100k the messages are retrieved and processed within the 
batch window, the rate calculator will increase the number of messages to 
retrieve in the next batch, until it reaches a new stable point or the backlog 
is finished processed.
    
    We're going to try deploying this internally at Shopify to see if this 
resolves our issue.
    
    @tdas @koeninger @holdenk 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JasonMWhite/spark rate_controller_offsets

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10089
    
----
commit 117ca3c2fbef5d5657bf038474dc5e7c1a588818
Author: Jason White <[email protected]>
Date:   2015-12-01T06:13:49Z

    backpressure rate controller consumes events preferentially from lagging 
partitions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to