[ 
https://issues.apache.org/jira/browse/KAFKA-13368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434418#comment-17434418
 ] 

Pedro Cardoso Silva edited comment on KAFKA-13368 at 10/26/21, 7:16 PM:
------------------------------------------------------------------------

Thank you for your suggestion [~showuon], I was unaware of it. This looks like 
halfway towards the solution I was looking for. What does this does if I 
understand correctly is allow the consumer to poll from a subset of the 
subscribed TopicPartitions.

 

Is there a way to control how much to poll from each TopicPartition so that the 
client application does not have to buffer messages between polls? 

Something like 
[https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max.poll.records]
 on a per topic partition basis. This would allow the application to define a 
"peek" like behavior for each topic partition efficiently and not lead to 
scenario where with 2+ topics and a max.poll.records of 100 where the 
application has to buffer ~30 messages on each topic that we pause.


was (Author: pcless):
Thank you for your suggestion [~showuon], I was unaware of it. Having

> Support smart topic polling for consumer with multiple topic subscriptions
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-13368
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13368
>             Project: Kafka
>          Issue Type: Wish
>          Components: consumer
>            Reporter: Pedro Cardoso Silva
>            Priority: Critical
>
> Currently there is no way to control how a Kafka consumer polls messages from 
> a list of topics that it has subscribed to. If I understand correctly, the 
> current approach is a round-robin polling mechanism across all topics that a 
> consumer has subscribed to. 
> This works reasonably well when the consumer's offset is aligned with the 
> latest message offset of the topics, however if we configured the Kafka 
> consumer to consume from the earliest offset where the topics have very 
> distinct amounts of messages each, there is no guarantee/control on how to 
> selectively read from topics.
> Depending on the use-case it may be useful for the Kafka consumer developer 
> to override this polling mechanism with a custom solution that makes sense 
> for downstream applications.
> Suppose you have 2 or more topics, where you want to merge the topics into a 
> single topic but due to large differences between the topic's message rates 
> you want to control from which topics to poll at a given time. 
> As an example consider 2 topics with the following schemas:
> {code:java}
> Topic1 Schema: {
>    timestamp: Long,
>    key: String,
>    col1: String,
>    col2: String
> }
> Topic2 Schema: { 
>    timestamp: Long,
>    key: String,
>    col3: String,
>    col4: String 
> }
> {code}
> Where Topic1 has 1,000,000 events from timestamp 0 to 1,000 (1000 ev/s) & 
> topic2 has 50,000 events from timestamp 0 to 50,000 (1 ev/s).
> Next we define a Kafka consumer that subscribes to Topic1 & Topic2. In the 
> current situation (round robin), assuming a polling batch of 100 messages,  
> we would read 50,000 from each topic which maps to 50 seconds worth of 
> messages on Topic1 and 50,000 seconds worth of messages on Topic2. 
> If we then try to sort the messages by timestamp we have incorrect results, 
> missing 500,000 messages from Topic1 that should be inserted between message 
> 0 & 1,000 of Topic2.
> The workaround solution is either to buffer the messages from Topic2 of have 
> 1 Kafka consumer per topic which has significant overhead with periodic 
> heartbeats, consumer registration in consumer groups, re-balancing, etc... 
> For a couple of topics this approach may be OK, but it does not scale for 
> 10's, 100's or more topics in a subscription.
> The ideal solution would be to extend the Kafka consumer API to allow a user 
> to define how to selectively poll messages from a subscription.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to