kirktrue commented on PR #16686:
URL: https://github.com/apache/kafka/pull/16686#issuecomment-2448860832

   @lianetm—
   
   > If there is time, the classic consumer does wait for the leave request 
response
   
   You're right. I got my wires crossed, as you state. The new consumer will 
wait up until the timeout, too. Thanks for catching that goof 🤦‍♂️
   
   > Other than that, I would say that all your points work like that at the 
moment except only for 4 and 7, where we need to change things to make them 
happen, so proposal: 4 and 7 do not work simply because we need thread hops to 
achieve them, and those hops can't happen on interrupt/low timeouts. So would 
it work if we just decouple the `UnsubscribeEvent` to avoid those hops in the 
close case? Meaning that on close, instead of blocking on an UnsubscribeEvent 
that will start by coming back to the app thread, instead we:
   > 
   > 1. trigger callbacks directly in the app thread -> will make 4 happen 
regardless or timeout/interrupt
   > 2. generate a kind of `LeaveGroup` event, blocking on it to allow for the 
case where there is time to shutdown cleanly, but swallowing 
interrupt/timeouts. This event would only send the request, no callbacks  -> 
will make 7 happen regardless of timeouts/interrupt, best effort
   > 
   > Would that work? not sure if I could be missing something, thoughts?
   
   I think that's a good idea and worth pursuing.
   
   Off the top of my head, two concerns are:
   
   1. It introduces duplication of logic in 
[`invokeOnPartitionsRevokedOrLostToReleaseAssignment()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java#L314),
 but it's possible some refactoring could be introduced to share code, which 
could soften the blow.
   2. It contributes to the (mis)use of the `SubscriptionState` from the 
application thread. Could there be in-process partition assignment changes 
about which only the background thread is aware? 🤔
   
   I threw this "sketch" of my interpretation of your suggestion...
   
   ```java
       private void releaseAssignmentAndLeaveGroup(final Timer timer, final 
boolean wasInterrupted) {
           Optional<ConsumerGroupMetadata> consumerGroupMetadata = 
groupMetadata.get();
   
           if (consumerGroupMetadata.isEmpty())
               return;
   
           if (autoCommitEnabled)
               commitSyncAllConsumed(timer);
   
           applicationEventHandler.add(new CommitOnCloseEvent());
   
           log.info("Releasing assignment and leaving group before closing 
consumer");
           try {
               Set<TopicPartition> assignedPartitions = 
subscriptions.assignedPartitions();
   
               if (assignedPartitions.isEmpty()) {
                   // Nothing to revoke.
                   return;
               }
   
               try {
                   SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
                   droppedPartitions.addAll(assignedPartitions);
   
                   if (consumerGroupMetadata.get().generationId() > 0)
                       
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
                   else
                       
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
               } catch (Exception e) {
                   log.warn("An error occurred releasing the partitions during 
close()", e);
               } finally {
                   timer.update();
               }
   
               applicationEventHandler.addAndGet(new 
LeaveGroupEvent(calculateDeadlineMs(timer)));
               log.info("Completed releasing assignment and sending leave group 
to close consumer");
           } catch (TimeoutException e) {
               log.warn("Consumer triggered an unsubscribe event to leave the 
group but couldn't " +
                   "complete it within {} ms. It will proceed to close.", 
timer.timeoutMs());
           } finally {
               timer.update();
           }
       }
   ```
   
   Does that seem along the lines of what you envisaged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to