[ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Glover updated KAFKA-7548:
-------------------------------
    Description: 
Today when we call KafkaConsumer.poll(), it will fetch data from Kafka 
asynchronously and is put in to a local buffer (completedFetches).

If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might 
throw away any buffered data that we might have in the local buffer for these 
TopicPartitions. Generally, if an application is calling pause on some 
TopicPartitions, it is likely to resume those TopicPartitions in near future, 
which would require KafkaConsumer to re-issue a fetch for the same data that it 
had buffered earlier for these TopicPartitions. This is a wasted effort from 
the application's point of view.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark to compare the "before-fix" and "after-fix" versions.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call.
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10 

||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number 
of Records consumed (After fix)||
|9|2087|4884693|

 
h4. _[#Updated June 24, 2019]_

I followed up with [~mgharat] on the status of this work since the current 
[patch|https://github.com/apache/kafka/pull/5844] PR is stale.  This work would 
also be beneficial to the Alpakka Kafka connector, which frequently pauses 
partitions as a means of back-pressure from upstream Akka Streams graph stages. 
 I've reviewed the PR feedback from [~hachikuji] and reimplemented this 
solution to add completed fetches that belong to paused partitions back to the 
queue.  I also rebased against the latest trunk which caused more changes as a 
result of subscription event handlers being removed from the fetcher class.

I created a [sample 
project|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548] 
that simulates the pause partition scenario that [~mgharat] described above.  
It only uses the Kafka client instead of a stream processor like Samza or 
Alpakka Kafka.  Even without setting max.poll.records to 1 there are 
significant gains in the number of records consumed and the amount of traffic 
between the consumer and brokers.  I created two versions of the sample 
project, one based on the latest available Kafka Client Consumer (2.2.1) and 
one based on the new patch (2.4.0-SNAPSHOT).  Each app has its own topic with 
its own producers and is constrained with cgroups.  For full details of the 
experiment see the [K8s resources in this 
[branch|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548/KAFKA-7548].

[I exported a Grafana snapshot for public 
viewing|https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2].
  I included a screenshot in the attachments.

 

  was:
Today when we call KafkaConsumer.poll(), it will fetch data from Kafka 
asynchronously and is put in to a local buffer (completedFetches).

If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might 
throw away any buffered data that we might have in the local buffer for these 
TopicPartitions. Generally, if an application is calling pause on some 
TopicPartitions, it is likely to resume those TopicPartitions in near future, 
which would require KafkaConsumer to re-issue a fetch for the same data that it 
had buffered earlier for these TopicPartitions. This is a wasted effort from 
the application's point of view.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark to compare the "before-fix" and "after-fix" versions.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call.
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10 

||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number 
of Records consumed (After fix)||
|9|2087|4884693|

 

_Updated June 24, 2019_

I followed up with [~mgharat] on the status of this work since the current 
[patch|https://github.com/apache/kafka/pull/5844] PR is stale.  This work would 
also be beneficial to the Alpakka Kafka connector, which frequently pauses 
partitions as a means of back-pressure from upstream Akka Streams graph stages. 
 I've reviewed the PR feedback from [~hachikuji] and reimplemented this 
solution to add completed fetches that belong to paused partitions back to the 
queue.  I also rebased against the latest trunk which caused more changes as a 
result of subscription event handlers being removed from the fetcher class.

I created a [sample 
project|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548] 
that simulates the pause partition scenario that [~mgharat] described above.  
It only uses the Kafka client instead of a stream processor like Samza or 
Alpakka Kafka.  Even without setting max.poll.records to 1 there are 
significant gains in the number of records consumed and the amount of traffic 
between the consumer and brokers.  I created two versions of the sample 
project, one based on the latest available Kafka Client Consumer (2.2.1) and 
one based on the new patch (2.4.0-SNAPSHOT).  Each app has its own topic with 
its own producers and is constrained with cgroups.  For full details of the 
experiment see the [K8s resources in this 
[branch|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548/KAFKA-7548].

[I exported a Grafana snapshot for public 
viewing|https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2].
  I included a screenshot in the attachments.

 


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-7548
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7548
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Mayuresh Gharat
>            Assignee: Sean Glover
>            Priority: Major
>         Attachments: image-2019-06-24-01-43-02-034.png
>
>
> Today when we call KafkaConsumer.poll(), it will fetch data from Kafka 
> asynchronously and is put in to a local buffer (completedFetches).
> If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might 
> throw away any buffered data that we might have in the local buffer for these 
> TopicPartitions. Generally, if an application is calling pause on some 
> TopicPartitions, it is likely to resume those TopicPartitions in near future, 
> which would require KafkaConsumer to re-issue a fetch for the same data that 
> it had buffered earlier for these TopicPartitions. This is a wasted effort 
> from the application's point of view.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark to compare the "before-fix" and "after-fix" versions.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused predefined number partitions for every poll call. The partitions to 
> pause were chosen randomly for each poll() call.
>  * Time to run Benchmark = 60 seconds.
>  * MaxPollRecords = 1
>  * Number of TopicPartition subscribed  = 10 
> ||Number Of Partitions Paused||Number of Records consumed (Before 
> fix)||Number of Records consumed (After fix)||
> |9|2087|4884693|
>  
> h4. _[#Updated June 24, 2019]_
> I followed up with [~mgharat] on the status of this work since the current 
> [patch|https://github.com/apache/kafka/pull/5844] PR is stale.  This work 
> would also be beneficial to the Alpakka Kafka connector, which frequently 
> pauses partitions as a means of back-pressure from upstream Akka Streams 
> graph stages.  I've reviewed the PR feedback from [~hachikuji] and 
> reimplemented this solution to add completed fetches that belong to paused 
> partitions back to the queue.  I also rebased against the latest trunk which 
> caused more changes as a result of subscription event handlers being removed 
> from the fetcher class.
> I created a [sample 
> project|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548] 
> that simulates the pause partition scenario that [~mgharat] described above.  
> It only uses the Kafka client instead of a stream processor like Samza or 
> Alpakka Kafka.  Even without setting max.poll.records to 1 there are 
> significant gains in the number of records consumed and the amount of traffic 
> between the consumer and brokers.  I created two versions of the sample 
> project, one based on the latest available Kafka Client Consumer (2.2.1) and 
> one based on the new patch (2.4.0-SNAPSHOT).  Each app has its own topic with 
> its own producers and is constrained with cgroups.  For full details of the 
> experiment see the [K8s resources in this 
> [branch|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548/KAFKA-7548].
> [I exported a Grafana snapshot for public 
> viewing|https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2].
>   I included a screenshot in the attachments.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to