[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853099#comment-16853099 ]
Richard Yu commented on KAFKA-8421: ----------------------------------- A key issue here is when we stop sending the old records of a consumer's assignment to the user. What we could consider doing is that, whenever we are in rebalance, we check the MemberState if its rebalancing first before sending. If the MemberState is no longer rebalancing, then we just abort any records we mean to send. onPartitionsRevoked/onPartitionsLost could also be be a viable way to abort an ongoing poll operation for old records. > Allow consumer.poll() to return data in the middle of rebalance > --------------------------------------------------------------- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Guozhang Wang > Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)