xingHu created FLINK-6790:
-----------------------------

             Summary: Flink Kafka Consumer Cannot Round Robin Fetch Records
                 Key: FLINK-6790
                 URL: https://issues.apache.org/jira/browse/FLINK-6790
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.2.1
            Reporter: xingHu


The Java consumer fails consume messages in a round robin fashion. This can 
lead to an unbalance consumption.
In our use case we have a set of consumer that can take a significant amount of 
time consuming messages off a topic. For this reason, we are using the 
pause/poll/resume pattern to ensure the consumer session is not timeout. The 
topic that is being consumed has been preloaded with message. That means there 
is a significant message lag when the consumer is first started. To limit how 
many messages are consumed at a time, the consumer has been configured with 
max.poll.records=1.
The first initial observation is that the client receive a large batch of 
messages for the first partition it decides to consume from and will consume 
all those messages before moving on, rather than returning a message from a 
different partition for each call to poll.
We solved this issue by configuring max.partition.fetch.bytes to be small 
enough that only a single message will be returned by the broker on each fetch, 
although this would not be feasible if message size were highly variable.
The behavior of the consumer after this change is to largely consume from a 
small number of partitions, usually just two, iterating between them, until it 
exhausts them, before moving to another partition. This behavior is problematic 
if the messages have some rough time semantics and need to be process roughly 
time ordered across all partitions.
It would be useful if source has a pluggable API that allowed custom logic to 
select which partition to consume from next, thus enabling the creation of a 
round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to