[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13310.
-----------------------------------
    Fix Version/s: 3.2.0
       Resolution: Fixed

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13310
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.8.1
>         Environment: prod
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>             Fix For: 3.2.0
>
>         Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> ThirdDebugLog1.png, ThirdDebugLog2.png, brokerCpu.png, brokerNetBytes.png, 
> kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
>     // try to update assignment metadata BUT do not need to block on the 
> timer for join group
>     updateAssignmentMetadataIfNeeded(timer, false);
> } else {
>     while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
>         log.warn("Still waiting for metadata");
>     }
> }{code}
>  
>  
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to the method 
> joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>       if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>       timer.update(time.milliseconds()); 
>       return false; 
> }
> {code}
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMs is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map<TopicPartition, 
> OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(rebalanceConfig.retryBackoffMs);
>  } while (timer.notExpired());
>  return false;
> }{code}
>  
>  
> *The reason for the endless loop is:*
>  (1) The expiration time of the timer is too long, which is 
> max.poll.interval.ms
>  (2) The offsets to be submitted contain dirty data and TopicPartition that 
> no longer exists
>  (3) The response future of sendOffsetCommitRequest(final Map<TopicPartition, 
> OffsetAndMetadata> offsets) has always failed, and the exception in the 
> future is UnknownTopicOrPartitionException. This exception is allowed to be 
> retried.
> Then since the infinite loop interval above is 100ms by default, 
> timer.sleep(rebalanceConfig.retryBackoffMs);
>  If a large number of consumers have this problem at the same time, a large 
> number of network requests will be generated to the Kafka broker, *resulting 
> in a sharp increase in the cpu and traffic of the broker machine!*
>  
>  
> h2. Suggest
> 1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the time of this method is recommended not to use max.poll.interval.ms,
>  This parameter is open to users to configure. Through the explanation of 
> this parameter on the official website, I would never think that this 
> parameter will be used in this place. At the same time, it will block 
> KafkaConsumer's poll (final Duration timeout), even if I set consumer.poll 
> (Duration.ofMillis(1000)).
>  2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
> ConsumerCoordinatord, before calling the ensureActiveGroup method, the 
> consumer ensures that the local metadata is up to date, see the code
>  
> {code:java}
> if (rejoinNeededOrPending()) {
>     // due to a race condition between the initial metadata fetch and the 
> initial rebalance,
>     // we need to ensure that the metadata is fresh before joining initially. 
> This ensures
>     // that we have matched the pattern against the cluster's topics at least 
> once before joining.
>     if (subscriptions.hasPatternSubscription()) {
>         // For consumer group that uses pattern-based subscription, after a 
> topic is created,
>         // any consumer that discovers the topic after metadata refresh can 
> trigger rebalance
>         // across the entire consumer group. Multiple rebalances can be 
> triggered after one topic
>         // creation if consumers refresh metadata at vastly different times. 
> We can significantly
>         // reduce the number of rebalances caused by single topic creation by 
> asking consumer to
>         // refresh metadata before re-joining the group as long as the 
> refresh backoff time has
>         // passed.
>         if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
>             this.metadata.requestUpdate();
>         }
>         if (!client.ensureFreshMetadata(timer)) {
>             return false;
>         }
>     }
>     if (!ensureActiveGroup(timer)) {
>         return false;
>     }
> }
> {code}
>  
> That is to say, the consumer knows which topic/topicPartition is legal before 
> onJoinPrepare. In this case, why didn't you find the 
> UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
> above,do not put the submitted offsets and the latest local metadata together 
> for analysis, remove the non-existent topicpartitions, and then try to submit 
> the offsets again. I think I can break out of the infinite loop by doing this
> 3. Why must the offset be submitted synchronously in the onJoinPrepare 
> method? Can't the offset be submitted asynchronously? Or provide a parameter 
> for the user to choose whether to submit synchronously or asynchronously. Or 
> provide a new parameter to control the maximum number of retries for 
> synchronous submission here, instead of using the Timer constructed by 
> max.poll.interval.ms.
>  And if you don’t really submit the offset here, it will not have much 
> impact. It may cause repeated consumption of some messages. I still suggest 
> to provide a new parameter to control whether you need to submit the offset.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to