[
https://issues.apache.org/jira/browse/KAFKA-13425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17436729#comment-17436729
]
RivenSun commented on KAFKA-13425:
----------------------------------
Hi [~showuon] and [~guozhang],
Do you have any suggestions for this issue?
Thanks.
> KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which
> maybe cause data loss on the consumer side
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13425
> URL: https://issues.apache.org/jira/browse/KAFKA-13425
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.0.0
> Reporter: RivenSun
> Priority: Major
> Attachments: architecture_picture.png
>
>
>
>
> h1. Foreword:
> Since I want to achieve the decoupling of the two processes of polling
> messages and consuming messages on the KafkaConsumer side, I use the "poll
> --> push" architecture model on the Kafka consumer side.
> .
> h2. Architecture
> see picture "architecture_picture"
> h3. 1)ThreadPoolExecutor
> The key parameters of ThreadPoolExecutor threadPool are:
> h4. (1) Select ArrayBlockingQueue<Runnable> for workQueue type
> h4. (2) The handler uses the RejectedExecutionHandler interface
> h4. (3)threadPool.allowCoreThreadTimeOut(true);
>
> h3. 2) KafkaConsumer
> The disadvantage of this architecture is that if the business side’s
> onMessage() method is time-consuming to execute, it will lead to:
> h4. (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number
> of Tasks, and eventually the push message will fail.
> h4. (2)How to deal with the KafkaConsumer poll() method when the push fails:
> h5. 1. stop call poll()
> KafkaConsumer needs to set
> *configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);*
> In order to prevent the heartbeat thread of KafkaConumser from discovering
> that KafkaConsumer does not call the poll() method for a long time, and
> automatically execute *maybeLeaveGroup("consumer poll timeout has expired.")*;
> But the most serious consequence of this is that once the rebalance of the
> group is triggered for some reason, *the rebalance process of the entire
> group will not be completed because the kafkaConsumer does not call the
> poll() method. This will cause all consumers under the entire group to stop
> consumption.*
> h5. 2. When the push message fails, continue to maintain the poll method
> The purpose is to maintain the poll method call of kafkaConsumer, but at this
> time KafkaConsumer should not poll any messages, because the downstream
> BlockingQueue for storing messages is full. So at this time we need the help
> of KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this
> special poll method maintainPoll4Rebalance().
>
> h1. maintainPoll4Rebalance Preliminary design ideas:
> code Simple design:
> {code:java}
> public static void main(String[] args) {
> while (true) {
> try {
> List<Object> messages =
> kafkaConsumer.poll(Duration.ofSeconds(1));
> while (!publish(message)) {
> try {
> maintainPoll4Rebalance();
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error
> {}", e.getMessage(), e);
> CommonUtil.sleep(TimeUnit.SECONDS, 1);
> }
> }
> } catch (Exception e) {
> log.error("KafkaConsumer poll message has error: {}",
> e.getMessage(), e);
> CommonUtil.sleep(TimeUnit.MILLISECONDS,
> ClientConfig.CLIENT_SLEEP_INTERVAL_MS);
> }
> }
> }
> private boolean publish(Object message) {
> try {
> ...
> threadPool.execute(() -> onMessage(message));
> } catch (RejectedExecutionException e) {
> log.error("consumer execute failed with error{}", e.getMessage(),
> e);
> return false;
> } catch (Exception e) {
> log.error("consumer execute failed with error{}", e.getMessage(),
> e);
> return false;
> }
> return true;
> }
> private void maintainPoll4Rebalance() {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords<String, Object> records =
> kafkaConsumer.poll(Duration.ofSeconds(1));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> for (ConsumerRecord<String, Object> consumerRecord : records)
> {
> if (consumerRecord != null) {
> log.error("this record need to retry, partition {}
> ,offset {}", consumerRecord.partition(), consumerRecord.offset());
> }
> }
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}",
> e.getMessage(), e);
> } finally {
> kafkaConsumer.resume(kafkaConsumer.assignment());
> }
> }
> {code}
>
>
> The above code maintainPoll4Rebalance() seems to be a good solution to my
> problem. When downstream consumption is blocked, KafkaConsumer can maintain
> the continuous call of the poll method, and it avoids that KafkaConsumer can
> continue to pull messages when the push fails.
> But in reality, logs will appear during operation:
> {code:java}
> [main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record!
> [main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset
> 36901
> {code}
> I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment())
> before kafkaConsumer#poll is called. Why does kafkaConsumer still pull the
> message and cause the message to be lost? The reason for the loss is that the
> consumer turned on the auto-commit offset.
>
> h1. RootCause Analysis
>
> KafkaConsumer#poll:
> 1) updateAssignmentMetadataIfNeeded
> 2) fetcher.fetchedRecords()
> 3) fetcher.sendFetches();
> These three methods are the three most critical operations in
> KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible
> for group rebalance related work. And RC appears in the first and second
> steps.
>
> h2. 1.updateAssignmentMetadataIfNeeded
>
> We trace directly to ConsumerCoordinator#onJoinPrepare(...)
> {code:java}
> else {
> switch (protocol) {
> case EAGER:
> // revoke all partitions
> revokedPartitions = new
> HashSet<>(subscriptions.assignedPartitions());
> exception = invokePartitionsRevoked(revokedPartitions);
>
> subscriptions.assignFromSubscribed(Collections.emptySet());
> break;
> case COOPERATIVE:
> // only revoke those partitions that are not in the
> subscription any more.
> Set<TopicPartition> ownedPartitions = new
> HashSet<>(subscriptions.assignedPartitions());
> revokedPartitions = ownedPartitions.stream()
> .filter(tp ->
> !subscriptions.subscription().contains(tp.topic()))
> .collect(Collectors.toSet());
> if (!revokedPartitions.isEmpty()) {
> exception =
> invokePartitionsRevoked(revokedPartitions);
> ownedPartitions.removeAll(revokedPartitions);
> subscriptions.assignFromSubscribed(ownedPartitions);
> }
> break;
> }
> }
> {code}
>
> The value of the protocol instance variable here, see its initialization code
> {code:java}
> // select the rebalance protocol such that:
> // 1. only consider protocols that are supported by all the
> assignors. If there is no common protocols supported
> // across all the assignors, throw an exception.
> // 2. if there are multiple protocols that are commonly supported,
> select the one with the highest id (i.e. the
> // id number indicates how advanced the protocol is).
> // we know there are at least one assignor in the list, no need to
> double check for NPE
> if (!assignors.isEmpty()) {
> List<RebalanceProtocol> supportedProtocols = new
> ArrayList<>(assignors.get(0).supportedProtocols());
> for (ConsumerPartitionAssignor assignor : assignors) {
> supportedProtocols.retainAll(assignor.supportedProtocols());
> }
> if (supportedProtocols.isEmpty()) {
> throw new IllegalArgumentException("Specified assignors " +
>
> assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet())
> +
> " do not have commonly supported rebalance protocol");
> }
> Collections.sort(supportedProtocols);
> protocol = supportedProtocols.get(supportedProtocols.size() - 1);
> } else {
> protocol = null;
> }
> {code}
>
> After a simple analysis, we can understand that as long as supportedProtocols
> contains the RebalanceProtocol.COOPERATIVE element, the protocol value will
> be COOPERATIVE, otherwise it will be EAGER.
> But to check the ConsumerPartitionAssignor interface, I found that all its
> implementation classes except CooperativeStickyAssignor, all other
> PartitionAssignor implementation classes have adopted default values
> {code:java}
> Indicate which rebalance protocol this assignor works with; By default it
> should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER.
> default List<RebalanceProtocol> supportedProtocols() {
> return Collections.singletonList(RebalanceProtocol.EAGER);
> }
> {code}
>
> So the code will run to
> {code:java}
> case EAGER:
> // revoke all partitions
> revokedPartitions = new
> HashSet<>(subscriptions.assignedPartitions());
> exception = invokePartitionsRevoked(revokedPartitions);
>
> subscriptions.assignFromSubscribed(Collections.emptySet());
> break;
> {code}
>
>
> The problem is here,
> *subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the
> assignment in my subscriptions, and then {color:#FF0000}clear the paused mark
> for TopicPartition.{color}*
> h2. 2.fetcher.fetchedRecords()
> There is no need to go into the code here, fetchedRecords will verify the
> corresponding TopicPartition of each message set CompletedFetch in memory
> h3. 1)if (subscriptions.isPaused(nextInLineFetch.partition))
> h3. 2)if (!subscriptions.isAssigned(completedFetch.partition))
> h3. 3)if (!subscriptions.isFetchable(completedFetch.partition))
>
> The problem is: If within the pollTimer specified by the user, a poll(...)
> call completes the updateAssignmentMetadataIfNeeded operation, the
> updateAssignmentMetadataIfNeeded method returns true, and the paused flag for
> TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and
> KafkaConsumer assigns after the rebalance is completed. Still holding this
> TopicPartition. Then the verification of TopicPartition mentioned above will
> pass.
> And the *{color:#FF0000}nextInLineFetch variable in KafkaConsumer memory
> stores TopicPartition messages,{color} the KafkaConsumer#poll(...) method
> will still return the message after calling pause(...). Even if you always
> call pause(...) before each poll(...), it will Return the message
> corresponding to TopicPartition.*
> If the business side cannot process the message at this time, and the
> KafkaConsumer turns on the automatic submission offset switch, the message
> will be lost on the consumer side. The maximum number of lost messages
> max.poll.records.
> {code:java}
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords<String, String> records =
> kafkaConsumer.poll(Duration.ofSeconds(5));
>
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}",
> e.getMessage(), e);
> } finally {
> kafkaConsumer.resume(kafkaConsumer.assignment());
> }
> {code}
>
>
>
>
> h1. maintainPoll4Rebalance() Temporary solution
>
> The paused mark of TopicPartition is remedied in
> ConsumerRebalanceListener#onPartitionsAssigned(...)
> {code:java}
> private boolean maintainPoll4Rebalance;
>
> private void initKafkaConsumer() {
> kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener()
> {
> @Override
> public void onPartitionsRevoked(Collection<TopicPartition>
> partitions) {
> confirmMessageSync();
> log.info("consumer on partition revoked!");
> }
> @Override
> public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
> try {
> if (maintainPoll4Rebalance) {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> }
> } catch (Exception e) {
> log.error("consumer onPartitionsAssigned failed with
> error:{}!", e.getMessage(), e);
> }
> log.info("consumer on partition assigned!");
> }
> });
> }
>
>
> private void maintainPoll4Rebalance() {
> try {
> maintainPoll4Rebalance = true;
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords<String, Object> records =
> kafkaConsumer.poll(Duration.ofSeconds(1));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> for (ConsumerRecord<String, Object> consumerRecord : records)
> {
> if (consumerRecord != null) {
> log.error("this record need to retry, partition {}
> ,offset {}", consumerRecord.partition(), consumerRecord.offset());
> }
> }
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}",
> e.getMessage(), e);
> } finally {
> maintainPoll4Rebalance = false;
> kafkaConsumer.resume(kafkaConsumer.assignment());
> }
> }
> {code}
>
> After testing, this problem can be temporarily solved. After calling
> kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return
> the corresponding TopicPartition message.
> h1. Suggestions
> h2. 1. Precise semantics of kafkaConsumer#pause(…)
> First look at the comments on this method
> {code:java}
> Suspend fetching from the requested partitions. Future calls to
> poll(Duration) will not return any records from these partitions until they
> have been resumed using resume(Collection). Note that this method does not
> affect partition subscription. In particular, it does not cause a group
> rebalance when automatic assignment is used.
> Params:
> partitions – The partitions which should be paused
> Throws:
> IllegalStateException – if any of the provided partitions are not currently
> assigned to this consumer
> @Override
> public void pause(Collection<TopicPartition> partitions) {
> acquireAndEnsureOpen();
> try {
> log.debug("Pausing partitions {}", partitions);
> for (TopicPartition partition: partitions) {
> subscriptions.pause(partition);
> }
> } finally {
> release();
> }
> }
> {code}
> {{}}
> *We don’t know from the comments that {color:#FF0000}the pause method will
> lose its function after a groupRebalance.{color}*
>
> h2. 2. When we execute invokePartitionsRevoked(revokedPartitions), do we
> consider the need to clean up the messages in KafkaConsumer memory
> corresponding to revokedPartitions?
> If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to
> re-initiate FetchRequests for resumedPartitions, which brings additional
> network transmission
>
> h2. 3.We better support the pause(...) method on the KafkaConsumer side that
> is not affected by groupRebalance
>
> h3. 1) When rebalance starts to prepare, add new logic to
> ConsumerCoordinator#onJoinPrepare(...)
> Before executing invokePartitionsRevoked(...) and
> subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions
> from the subscriptions.assignment of the current KafkaConsumer, and
> customerPausedPartitions should be instance variables of ConsumerCoordinator.
> {code:java}
> customerPausedPartitions = subscriptions.pausedPartitions();
> //Add new code in front of the following two codes
> exception = invokePartitionsRevoked(...);
> subscriptions.assignFromSubscribed(...);
> {code}
>
>
> h3. 2) After the rebalance is completed, add new logic to
> ConsumerCoordinator#onJoinComplete(...)
> {code:java}
> protected void onJoinComplete(int generation,
> String memberId,
> String assignmentStrategy,
> ByteBuffer assignmentBuffer) {
> log.debug("Executing onJoinComplete with generation {} and memberId
> {}", generation, memberId);
> ......
> subscriptions.assignFromSubscribed(assignedPartitions);
>
> //Add new code here
> if (customerPausedPartitions != null &&
> customerPausedPartitions.size() != 0){
> customerPausedPartitions.forEach(topicPartition -> {
> if(subscriptions.isAssigned(topicPartition))
> subscriptions.pause(topicPartition);
> });
> customerPausedPartitions = null;
> }
> // Add partitions that were not previously owned but are now assigned
> firstException.compareAndSet(null,
> invokePartitionsAssigned(addedPartitions));
> ......
> }
> {code}
>
>
> *The above is just a first draft of the modified code. It can only guarantee
> that after a rebalance, the topicPartitions still held in the new assignment
> of KafkaConsumer will maintain the paused mark.*
> *{color:#FF0000}Note{color}: If the new assignment of kafkaConsumer no longer
> contains topicPartitions that have been paused before rebalance, the paused
> mark of these topicPartitions will be lost forever on the kafkaConsumer side,
> even if in a future rebalance, the kafkaConsumer will hold these partitions
> again.*
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)