RivenSun created KAFKA-13425:
--------------------------------
Summary: KafkaConsumer#pause() will lose its effect after
groupRebalance occurs, which maybe cause data loss on the consumer side
Key: KAFKA-13425
URL: https://issues.apache.org/jira/browse/KAFKA-13425
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 3.0.0
Reporter: RivenSun
Attachments: architecture_picture.png
h1. Foreword:
Since I want to achieve the decoupling of the two processes of polling messages
and consuming messages on the KafkaConsumer side, I use the "poll --> push"
architecture model on the Kafka consumer side.
.
h2. Architecture
see picture "architecture_picture"
h3. 1)ThreadPoolExecutor
The key parameters of ThreadPoolExecutor threadPool are:
h4. (1) Select ArrayBlockingQueue<Runnable> for workQueue type
h4. (2) The handler uses the RejectedExecutionHandler interface
h4. (3)threadPool.allowCoreThreadTimeOut(true);
h3. 2) KafkaConsumer
The disadvantage of this architecture is that if the business side’s
onMessage() method is time-consuming to execute, it will lead to:
h4. (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number
of Tasks, and eventually the push message will fail.
h4. (2)How to deal with the KafkaConsumer poll() method when the push fails:
h5. 1. stop call poll()
KafkaConsumer needs to set
*configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);*
In order to prevent the heartbeat thread of KafkaConumser from discovering that
KafkaConsumer does not call the poll() method for a long time, and
automatically execute *maybeLeaveGroup("consumer poll timeout has expired.")*;
But the most serious consequence of this is that once the rebalance of the
group is triggered for some reason, *the rebalance process of the entire group
will not be completed because the kafkaConsumer does not call the poll()
method. This will cause all consumers under the entire group to stop
consumption.*
h5. 2. When the push message fails, continue to maintain the poll method
The purpose is to maintain the poll method call of kafkaConsumer, but at this
time KafkaConsumer should not poll any messages, because the downstream
BlockingQueue for storing messages is full. So at this time we need the help of
KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this
special poll method maintainPoll4Rebalance().
h1. maintainPoll4Rebalance Preliminary design ideas:
code Simple design:
{code:java}
public static void main(String[] args) {
while (true) {
try {
List<Object> messages =
kafkaConsumer.poll(Duration.ofSeconds(1));
while (!publish(message)) {
try {
maintainPoll4Rebalance();
} catch (Exception e) {
log.error("maintain poll for rebalance with error {}",
e.getMessage(), e);
CommonUtil.sleep(TimeUnit.SECONDS, 1);
}
}
} catch (Exception e) {
log.error("KafkaConsumer poll message has error: {}",
e.getMessage(), e);
CommonUtil.sleep(TimeUnit.MILLISECONDS,
ClientConfig.CLIENT_SLEEP_INTERVAL_MS);
}
}
}
private boolean publish(Object message) {
try {
...
threadPool.execute(() -> onMessage(message));
} catch (RejectedExecutionException e) {
log.error("consumer execute failed with error{}", e.getMessage(),
e);
return false;
} catch (Exception e) {
log.error("consumer execute failed with error{}", e.getMessage(),
e);
return false;
}
return true;
}
private void maintainPoll4Rebalance() {
try {
kafkaConsumer.pause(kafkaConsumer.assignment());
ConsumerRecords<String, Object> records =
kafkaConsumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
log.error("kafka poll for rebalance discard some record!");
for (ConsumerRecord<String, Object> consumerRecord : records) {
if (consumerRecord != null) {
log.error("this record need to retry, partition {}
,offset {}", consumerRecord.partition(), consumerRecord.offset());
}
}
}
} catch (Exception e) {
log.error("maintain poll for rebalance with error:{}",
e.getMessage(), e);
} finally {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
}
{code}
The above code maintainPoll4Rebalance() seems to be a good solution to my
problem. When downstream consumption is blocked, KafkaConsumer can maintain the
continuous call of the poll method, and it avoids that KafkaConsumer can
continue to pull messages when the push fails.
But in reality, logs will appear during operation:
{code:java}
[main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record!
[main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset
36901
{code}
I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment()) before
kafkaConsumer#poll is called. Why does kafkaConsumer still pull the message and
cause the message to be lost? The reason for the loss is that the consumer
turned on the auto-commit offset.
h1. RootCause Analysis
KafkaConsumer#poll:
1) updateAssignmentMetadataIfNeeded
2) fetcher.fetchedRecords()
3) fetcher.sendFetches();
These three methods are the three most critical operations in
KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible for
group rebalance related work. And RC appears in the first and second steps.
h2. 1.updateAssignmentMetadataIfNeeded
We trace directly to ConsumerCoordinator#onJoinPrepare(...)
{code:java}
else {
switch (protocol) {
case EAGER:
// revoke all partitions
revokedPartitions = new
HashSet<>(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// only revoke those partitions that are not in the
subscription any more.
Set<TopicPartition> ownedPartitions = new
HashSet<>(subscriptions.assignedPartitions());
revokedPartitions = ownedPartitions.stream()
.filter(tp ->
!subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet());
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
}
{code}
The value of the protocol instance variable here, see its initialization code
{code:java}
// select the rebalance protocol such that:
// 1. only consider protocols that are supported by all the
assignors. If there is no common protocols supported
// across all the assignors, throw an exception.
// 2. if there are multiple protocols that are commonly supported,
select the one with the highest id (i.e. the
// id number indicates how advanced the protocol is).
// we know there are at least one assignor in the list, no need to
double check for NPE
if (!assignors.isEmpty()) {
List<RebalanceProtocol> supportedProtocols = new
ArrayList<>(assignors.get(0).supportedProtocols());
for (ConsumerPartitionAssignor assignor : assignors) {
supportedProtocols.retainAll(assignor.supportedProtocols());
}
if (supportedProtocols.isEmpty()) {
throw new IllegalArgumentException("Specified assignors " +
assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet())
+
" do not have commonly supported rebalance protocol");
}
Collections.sort(supportedProtocols);
protocol = supportedProtocols.get(supportedProtocols.size() - 1);
} else {
protocol = null;
}
{code}
After a simple analysis, we can understand that as long as supportedProtocols
contains the RebalanceProtocol.COOPERATIVE element, the protocol value will be
COOPERATIVE, otherwise it will be EAGER.
But to check the ConsumerPartitionAssignor interface, I found that all its
implementation classes except CooperativeStickyAssignor, all other
PartitionAssignor implementation classes have adopted default values
{code:java}
Indicate which rebalance protocol this assignor works with; By default it
should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER.
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
{code}
So the code will run to
{code:java}
case EAGER:
// revoke all partitions
revokedPartitions = new
HashSet<>(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
{code}
The problem is here,
*subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the
assignment in my subscriptions, and then {color:#FF0000}clear the paused mark
for TopicPartition.{color}*
h2. 2.fetcher.fetchedRecords()
There is no need to go into the code here, fetchedRecords will verify the
corresponding TopicPartition of each message set CompletedFetch in memory
h3. 1)if (subscriptions.isPaused(nextInLineFetch.partition))
h3. 2)if (!subscriptions.isAssigned(completedFetch.partition))
h3. 3)if (!subscriptions.isFetchable(completedFetch.partition))
The problem is: If within the pollTimer specified by the user, a poll(...) call
completes the updateAssignmentMetadataIfNeeded operation, the
updateAssignmentMetadataIfNeeded method returns true, and the paused flag for
TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and
KafkaConsumer assigns after the rebalance is completed. Still holding this
TopicPartition. Then the verification of TopicPartition mentioned above will
pass.
And the *{color:#FF0000}nextInLineFetch variable in KafkaConsumer memory stores
TopicPartition messages,{color} the KafkaConsumer#poll(...) method will still
return the message after calling pause(...). Even if you always call pause(...)
before each poll(...), it will Return the message corresponding to
TopicPartition.*
If the business side cannot process the message at this time, and the
KafkaConsumer turns on the automatic submission offset switch, the message will
be lost on the consumer side. The maximum number of lost messages
max.poll.records.
{code:java}
try {
kafkaConsumer.pause(kafkaConsumer.assignment());
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofSeconds(5));
} catch (Exception e) {
log.error("maintain poll for rebalance with error:{}",
e.getMessage(), e);
} finally {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
{code}
h1. maintainPoll4Rebalance() Temporary solution
The paused mark of TopicPartition is remedied in
ConsumerRebalanceListener#onPartitionsAssigned(...)
{code:java}
private boolean maintainPoll4Rebalance;
private void initKafkaConsumer() {
kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
confirmMessageSync();
log.info("consumer on partition revoked!");
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
try {
if (maintainPoll4Rebalance) {
kafkaConsumer.pause(kafkaConsumer.assignment());
}
} catch (Exception e) {
log.error("consumer onPartitionsAssigned failed with
error:{}!", e.getMessage(), e);
}
log.info("consumer on partition assigned!");
}
});
}
private void maintainPoll4Rebalance() {
try {
maintainPoll4Rebalance = true;
kafkaConsumer.pause(kafkaConsumer.assignment());
ConsumerRecords<String, Object> records =
kafkaConsumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
log.error("kafka poll for rebalance discard some record!");
for (ConsumerRecord<String, Object> consumerRecord : records) {
if (consumerRecord != null) {
log.error("this record need to retry, partition {}
,offset {}", consumerRecord.partition(), consumerRecord.offset());
}
}
}
} catch (Exception e) {
log.error("maintain poll for rebalance with error:{}",
e.getMessage(), e);
} finally {
maintainPoll4Rebalance = false;
kafkaConsumer.resume(kafkaConsumer.assignment());
}
}
{code}
After testing, this problem can be temporarily solved. After calling
kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return
the corresponding TopicPartition message.
h1. Suggestions
h2. 1. Precise semantics of kafkaConsumer#pause(…)
First look at the comments on this method
{code:java}
Suspend fetching from the requested partitions. Future calls to poll(Duration)
will not return any records from these partitions until they have been resumed
using resume(Collection). Note that this method does not affect partition
subscription. In particular, it does not cause a group rebalance when automatic
assignment is used.
Params:
partitions – The partitions which should be paused
Throws:
IllegalStateException – if any of the provided partitions are not currently
assigned to this consumer
@Override
public void pause(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
log.debug("Pausing partitions {}", partitions);
for (TopicPartition partition: partitions) {
subscriptions.pause(partition);
}
} finally {
release();
}
}
{code}
{{}}
*We don’t know from the comments that {color:#FF0000}the pause method will lose
its function after a groupRebalance.{color}*
h2. 2. When we execute invokePartitionsRevoked(revokedPartitions), do we
consider the need to clean up the messages in KafkaConsumer memory
corresponding to revokedPartitions?
If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to
re-initiate FetchRequests for resumedPartitions, which brings additional
network transmission
h2. 3.We better support the pause(...) method on the KafkaConsumer side that is
not affected by groupRebalance
h3. 1) When rebalance starts to prepare, add new logic to
ConsumerCoordinator#onJoinPrepare(...)
Before executing invokePartitionsRevoked(...) and
subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions
from the subscriptions.assignment of the current KafkaConsumer, and
customerPausedPartitions should be instance variables of ConsumerCoordinator.
{code:java}
customerPausedPartitions = subscriptions.pausedPartitions();
//Add new code in front of the following two codes
exception = invokePartitionsRevoked(...);
subscriptions.assignFromSubscribed(...);
{code}
h3. 2) After the rebalance is completed, add new logic to
ConsumerCoordinator#onJoinComplete(...)
{code:java}
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId
{}", generation, memberId);
......
subscriptions.assignFromSubscribed(assignedPartitions);
//Add new code here
if (customerPausedPartitions != null && customerPausedPartitions.size()
!= 0){
customerPausedPartitions.forEach(topicPartition -> {
if(subscriptions.isAssigned(topicPartition))
subscriptions.pause(topicPartition);
});
customerPausedPartitions = null;
}
// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null,
invokePartitionsAssigned(addedPartitions));
......
}
{code}
*The above is just a first draft of the modified code. It can only guarantee
that after a rebalance, the topicPartitions still held in the new assignment of
KafkaConsumer will maintain the paused mark.*
*{color:#FF0000}Note{color}: If the new assignment of kafkaConsumer no longer
contains topicPartitions that have been paused before rebalance, the paused
mark of these topicPartitions will be lost forever on the kafkaConsumer side,
even if in a future rebalance, the kafkaConsumer will hold these partitions
again.*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)