[ 
https://issues.apache.org/jira/browse/KAFKA-13425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437413#comment-17437413
 ] 

RivenSun commented on KAFKA-13425:
----------------------------------

[~showuon]

In fact, the purpose of the modified code I added is to make the behavior of 
the EAGER and COOPERATIVE strategies *finally consistent* : After the rebalance 
is completed, KafkaConsumer can continue to maintain the paused flags of 
topicPartitions that are still in the latest assignment.


Currently *only the COOPERATIVE* strategy achieves such semantics.

> KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which 
> maybe cause data loss on the consumer side
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13425
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13425
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: architecture_picture.png
>
>
>  
>   
> h1. Foreword:
> Since I want to achieve the decoupling of the two processes of polling 
> messages and consuming messages on the KafkaConsumer side, I use the "poll 
> --> push" architecture model on the Kafka consumer side.
> .
> h2. Architecture
>  see picture "architecture_picture"
> h3. 1)ThreadPoolExecutor
> The key parameters of ThreadPoolExecutor threadPool are:
> h4. (1) Select ArrayBlockingQueue<Runnable> for workQueue type
> h4. (2) The handler uses the RejectedExecutionHandler interface
> h4. (3)threadPool.allowCoreThreadTimeOut(true);
>  
> h3. 2) KafkaConsumer
> The disadvantage of this architecture is that if the business side’s 
> onMessage() method is time-consuming to execute, it will lead to:
> h4. (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number 
> of Tasks, and eventually the push message will fail.
> h4. (2)How to deal with the KafkaConsumer poll() method when the push fails:
> h5. 1. stop call poll()
> KafkaConsumer needs to set 
> *configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);* 
> In order to prevent the heartbeat thread of KafkaConumser from discovering 
> that KafkaConsumer does not call the poll() method for a long time, and 
> automatically execute *maybeLeaveGroup("consumer poll timeout has expired.")*;
> But the most serious consequence of this is that once the rebalance of the 
> group is triggered for some reason, *the rebalance process of the entire 
> group will not be completed because the kafkaConsumer does not call the 
> poll() method. This will cause all consumers under the entire group to stop 
> consumption.*
> h5. 2. When the push message fails, continue to maintain the poll method
> The purpose is to maintain the poll method call of kafkaConsumer, but at this 
> time KafkaConsumer should not poll any messages, because the downstream 
> BlockingQueue for storing messages is full. So at this time we need the help 
> of KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this 
> special poll method maintainPoll4Rebalance().
>  
> h1. maintainPoll4Rebalance Preliminary design ideas:
> code Simple design:
> {code:java}
>     public static void main(String[] args) {
>         while (true) {
>             try {
>                 List<Object> messages = 
> kafkaConsumer.poll(Duration.ofSeconds(1));
>                 while (!publish(message)) {
>                     try {
>                         maintainPoll4Rebalance();
>                     } catch (Exception e) {
>                         log.error("maintain poll for rebalance with error 
> {}", e.getMessage(), e);
>                         CommonUtil.sleep(TimeUnit.SECONDS, 1);
>                     }
>                 }
>             } catch (Exception e) {
>                 log.error("KafkaConsumer poll message has error: {}", 
> e.getMessage(), e);
>                 CommonUtil.sleep(TimeUnit.MILLISECONDS, 
> ClientConfig.CLIENT_SLEEP_INTERVAL_MS);
>             }
>         }
>     }
>     private boolean publish(Object message) {
>         try {
>             ...
>             threadPool.execute(() -> onMessage(message));
>         } catch (RejectedExecutionException e) {
>             log.error("consumer execute failed with error{}", e.getMessage(), 
> e);
>             return false;
>         } catch (Exception e) {
>             log.error("consumer execute failed with error{}", e.getMessage(), 
> e);
>             return false;
>         }
>         return true;
>     }
>     private void maintainPoll4Rebalance() {
>         try {
>             kafkaConsumer.pause(kafkaConsumer.assignment());
>             ConsumerRecords<String, Object> records = 
> kafkaConsumer.poll(Duration.ofSeconds(1));
>             if (!records.isEmpty()) {
>                 log.error("kafka poll for rebalance discard some record!");
>                 for (ConsumerRecord<String, Object> consumerRecord : records) 
> {
>                     if (consumerRecord != null) {
>                         log.error("this record need to retry, partition {} 
> ,offset {}", consumerRecord.partition(), consumerRecord.offset());
>                     }
>                 }
>             }
>         } catch (Exception e) {
>             log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
>         } finally {
>             kafkaConsumer.resume(kafkaConsumer.assignment());
>         }
>     }
> {code}
>  
>  
> The above code maintainPoll4Rebalance() seems to be a good solution to my 
> problem. When downstream consumption is blocked, KafkaConsumer can maintain 
> the continuous call of the poll method, and it avoids that KafkaConsumer can 
> continue to pull messages when the push fails.
> But in reality, logs will appear during operation:
> {code:java}
> [main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record!
> [main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset 
> 36901
> {code}
> I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment()) 
> before kafkaConsumer#poll is called. Why does kafkaConsumer still pull the 
> message and cause the message to be lost? The reason for the loss is that the 
> consumer turned on the auto-commit offset.
>  
> h1. RootCause Analysis
>  
> KafkaConsumer#poll:
>  1) updateAssignmentMetadataIfNeeded
>  2) fetcher.fetchedRecords()
>  3) fetcher.sendFetches();
> These three methods are the three most critical operations in 
> KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible 
> for group rebalance related work. And RC appears in the first and second 
> steps.
>  
> h2. 1.updateAssignmentMetadataIfNeeded
>  
> We trace directly to ConsumerCoordinator#onJoinPrepare(...)
> {code:java}
> else {
>             switch (protocol) {
>                 case EAGER:
>                     // revoke all partitions
>                     revokedPartitions = new 
> HashSet<>(subscriptions.assignedPartitions());
>                     exception = invokePartitionsRevoked(revokedPartitions);
>                     
> subscriptions.assignFromSubscribed(Collections.emptySet());
>                     break;
>                 case COOPERATIVE:
>                     // only revoke those partitions that are not in the 
> subscription any more.
>                     Set<TopicPartition> ownedPartitions = new 
> HashSet<>(subscriptions.assignedPartitions());
>                     revokedPartitions = ownedPartitions.stream()
>                         .filter(tp -> 
> !subscriptions.subscription().contains(tp.topic()))
>                         .collect(Collectors.toSet());
>                     if (!revokedPartitions.isEmpty()) {
>                         exception = 
> invokePartitionsRevoked(revokedPartitions);
>                         ownedPartitions.removeAll(revokedPartitions);
>                         subscriptions.assignFromSubscribed(ownedPartitions);
>                     }
>                     break;
>             }
>         }
> {code}
>  
> The value of the protocol instance variable here, see its initialization code
> {code:java}
>         // select the rebalance protocol such that:
>         //   1. only consider protocols that are supported by all the 
> assignors. If there is no common protocols supported
>         //      across all the assignors, throw an exception.
>         //   2. if there are multiple protocols that are commonly supported, 
> select the one with the highest id (i.e. the
>         //      id number indicates how advanced the protocol is).
>         // we know there are at least one assignor in the list, no need to 
> double check for NPE
>         if (!assignors.isEmpty()) {
>             List<RebalanceProtocol> supportedProtocols = new 
> ArrayList<>(assignors.get(0).supportedProtocols());
>             for (ConsumerPartitionAssignor assignor : assignors) {
>                 supportedProtocols.retainAll(assignor.supportedProtocols());
>             }
>             if (supportedProtocols.isEmpty()) {
>                 throw new IllegalArgumentException("Specified assignors " +
>                     
> assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet())
>  +
>                     " do not have commonly supported rebalance protocol");
>             }
>             Collections.sort(supportedProtocols);
>             protocol = supportedProtocols.get(supportedProtocols.size() - 1);
>         } else {
>             protocol = null;
>         }
> {code}
>  
> After a simple analysis, we can understand that as long as supportedProtocols 
> contains the RebalanceProtocol.COOPERATIVE element, the protocol value will 
> be COOPERATIVE, otherwise it will be EAGER.
> But to check the ConsumerPartitionAssignor interface, I found that all its 
> implementation classes except CooperativeStickyAssignor, all other 
> PartitionAssignor implementation classes have adopted default values
> {code:java}
> Indicate which rebalance protocol this assignor works with; By default it 
> should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER.
> default List<RebalanceProtocol> supportedProtocols() {
>         return Collections.singletonList(RebalanceProtocol.EAGER);
>     }
> {code}
>  
> So the code will run to
> {code:java}
>                 case EAGER:
>                     // revoke all partitions
>                     revokedPartitions = new 
> HashSet<>(subscriptions.assignedPartitions());
>                     exception = invokePartitionsRevoked(revokedPartitions);
>                     
> subscriptions.assignFromSubscribed(Collections.emptySet());
>                     break;
> {code}
>  
>  
> The problem is here, 
> *subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the 
> assignment in my subscriptions, and then {color:#ff0000}clear the paused mark 
> for TopicPartition.{color}*
> h2. 2.fetcher.fetchedRecords()
> There is no need to go into the code here, fetchedRecords will verify the 
> corresponding TopicPartition of each message set CompletedFetch in memory
> h3. 1)if (subscriptions.isPaused(nextInLineFetch.partition))
> h3. 2)if (!subscriptions.isAssigned(completedFetch.partition))
> h3. 3)if (!subscriptions.isFetchable(completedFetch.partition))
>  
> The problem is: If within the pollTimer specified by the user, a poll(...) 
> call completes the updateAssignmentMetadataIfNeeded operation, the 
> updateAssignmentMetadataIfNeeded method returns true, and the paused flag for 
> TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and 
> the new assignments of kafkaConsumer still hold this TopicPartition after the 
> rebalance is completed. Then the verification of TopicPartition mentioned 
> above will pass.
> And the *{color:#ff0000}nextInLineFetch variable in KafkaConsumer memory 
> stores TopicPartition messages,{color} the KafkaConsumer#poll(...) method 
> will still return the message after calling pause(...). Even if you always 
> call pause(...) before each poll(...), it will Return the message 
> corresponding to TopicPartition.*
> If the business side cannot process the message at this time, and the 
> KafkaConsumer turns on the automatic submission offset switch, the message 
> will be lost on the consumer side. The maximum number of lost messages 
> max.poll.records.
> {code:java}
>         try {
>             kafkaConsumer.pause(kafkaConsumer.assignment());
>             ConsumerRecords<String, String> records = 
> kafkaConsumer.poll(Duration.ofSeconds(5));
>             
>         } catch (Exception e) {
>             log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
>         } finally {
>             kafkaConsumer.resume(kafkaConsumer.assignment());
>         }
> {code}
>  
>  
>  
>  
> h1. maintainPoll4Rebalance() Temporary solution
>  
> The paused mark of TopicPartition is remedied in 
> ConsumerRebalanceListener#onPartitionsAssigned(...)
> {code:java}
>     private boolean maintainPoll4Rebalance;
>    
>     private void initKafkaConsumer() {
>         kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener() 
> {
>             @Override
>             public void onPartitionsRevoked(Collection<TopicPartition> 
> partitions) {
>                 confirmMessageSync();
>                 log.info("consumer on  partition revoked!");
>             }
>             @Override
>             public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions) {
>                 try {
>                     if (maintainPoll4Rebalance) {
>                         kafkaConsumer.pause(kafkaConsumer.assignment());
>                     }
>                 } catch (Exception e) {
>                     log.error("consumer onPartitionsAssigned failed with 
> error:{}!", e.getMessage(), e);
>                 }
>                 log.info("consumer on  partition assigned!");
>             }
>         });
>     }
>     
>     
>     private void maintainPoll4Rebalance() {
>         try {
>             maintainPoll4Rebalance = true;
>             kafkaConsumer.pause(kafkaConsumer.assignment());
>             ConsumerRecords<String, Object> records = 
> kafkaConsumer.poll(Duration.ofSeconds(1));
>             if (!records.isEmpty()) {
>                 log.error("kafka poll for rebalance discard some record!");
>                 for (ConsumerRecord<String, Object> consumerRecord : records) 
> {
>                     if (consumerRecord != null) {
>                         log.error("this record need to retry, partition {} 
> ,offset {}", consumerRecord.partition(), consumerRecord.offset());
>                     }
>                 }
>             }
>         } catch (Exception e) {
>             log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
>         } finally {
>             maintainPoll4Rebalance = false;
>             kafkaConsumer.resume(kafkaConsumer.assignment());
>         }
>     }
> {code}
>  
> After testing, this problem can be temporarily solved. After calling 
> kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return 
> the corresponding TopicPartition message.
> h1. Suggestions
> {{1. Precise semantics of kafkaConsumer#pause(…)}}
> First look at the comments on this method
> {code:java}
> Suspend fetching from the requested partitions. Future calls to 
> poll(Duration) will not return any records from these partitions until they 
> have been resumed using resume(Collection). Note that this method does not 
> affect partition subscription. In particular, it does not cause a group 
> rebalance when automatic assignment is used.
> Params:
> partitions – The partitions which should be paused
> Throws:
> IllegalStateException – if any of the provided partitions are not currently 
> assigned to this consumer
> @Override
>     public void pause(Collection<TopicPartition> partitions) {
>         acquireAndEnsureOpen();
>         try {
>             log.debug("Pausing partitions {}", partitions);
>             for (TopicPartition partition: partitions) {
>                 subscriptions.pause(partition);
>             }
>         } finally {
>             release();
>         }
>     }
> {code}
>  
> We don’t know from the comments that {color:#ff0000}the pause method will 
> lose its function after a groupRebalance.{color}
> And When cleaning the paused mark of topicPartitions, kafkaConsumer did not 
> output any logs, and the customer could not perceive that the pause(...) 
> method no longer works.
>  
> {{2. When we execute invokePartitionsRevoked(revokedPartitions), do we 
> consider the need to clean up the messages in KafkaConsumer memory 
> corresponding to revokedPartitions?}}
> If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to 
> re-initiate FetchRequests for resumedPartitions, which brings additional 
> network transmission
>  
> {{3.We better support the pause(...) method on the KafkaConsumer side that is 
> not affected by groupRebalance}}
> 1) When rebalance starts to prepare, add new logic to 
> ConsumerCoordinator#onJoinPrepare(...)
> Before executing invokePartitionsRevoked(...) and 
> subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions 
> from the subscriptions.assignment of the current KafkaConsumer, and 
> customerPausedPartitions should be instance variables of ConsumerCoordinator.
> {code:java}
> customerPausedPartitions = subscriptions.pausedPartitions();
> //Add new code in front of the following two codes
> exception = invokePartitionsRevoked(...);
> subscriptions.assignFromSubscribed(...);
> {code}
>  
> 2) After the rebalance is completed, add new logic to 
> ConsumerCoordinator#onJoinComplete(...)
> {code:java}
>     protected void onJoinComplete(int generation,
>                                   String memberId,
>                                   String assignmentStrategy,
>                                   ByteBuffer assignmentBuffer) {
>         log.debug("Executing onJoinComplete with generation {} and memberId 
> {}", generation, memberId);
>         ......
>         subscriptions.assignFromSubscribed(assignedPartitions);
>         
>         //Add new code here
>         if (customerPausedPartitions != null && 
> customerPausedPartitions.size() != 0){
>             customerPausedPartitions.forEach(topicPartition -> {
>                 if(subscriptions.isAssigned(topicPartition))
>                     subscriptions.pause(topicPartition);
>             });
>             customerPausedPartitions = null;
>         }
>         // Add partitions that were not previously owned but are now assigned
>         firstException.compareAndSet(null, 
> invokePartitionsAssigned(addedPartitions));
>         ......
>     }
> {code}
> The above is just a first draft of the modified code. It can only guarantee 
> that after a rebalance, the topicPartitions still held in the new assignment 
> of KafkaConsumer will maintain the paused mark.
> *Note*: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to