kirktrue commented on PR #16686: URL: https://github.com/apache/kafka/pull/16686#issuecomment-2448860832
@lianetm— > If there is time, the classic consumer does wait for the leave request response You're right. I got my wires crossed, as you state. The new consumer will wait up until the timeout, too. Thanks for catching that goof 🤦♂️ > Other than that, I would say that all your points work like that at the moment except only for 4 and 7, where we need to change things to make them happen, so proposal: 4 and 7 do not work simply because we need thread hops to achieve them, and those hops can't happen on interrupt/low timeouts. So would it work if we just decouple the `UnsubscribeEvent` to avoid those hops in the close case? Meaning that on close, instead of blocking on an UnsubscribeEvent that will start by coming back to the app thread, instead we: > > 1. trigger callbacks directly in the app thread -> will make 4 happen regardless or timeout/interrupt > 2. generate a kind of `LeaveGroup` event, blocking on it to allow for the case where there is time to shutdown cleanly, but swallowing interrupt/timeouts. This event would only send the request, no callbacks -> will make 7 happen regardless of timeouts/interrupt, best effort > > Would that work? not sure if I could be missing something, thoughts? I think that's a good idea and worth pursuing. Off the top of my head, two concerns are: 1. It introduces duplication of logic in [`invokeOnPartitionsRevokedOrLostToReleaseAssignment()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java#L314), but it's possible some refactoring could be introduced to share code, which could soften the blow. 2. It contributes to the (mis)use of the `SubscriptionState` from the application thread. Could there be in-process partition assignment changes about which only the background thread is aware? 🤔 I threw this "sketch" of my interpretation of your suggestion... ```java private void releaseAssignmentAndLeaveGroup(final Timer timer, final boolean wasInterrupted) { Optional<ConsumerGroupMetadata> consumerGroupMetadata = groupMetadata.get(); if (consumerGroupMetadata.isEmpty()) return; if (autoCommitEnabled) commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); log.info("Releasing assignment and leaving group before closing consumer"); try { Set<TopicPartition> assignedPartitions = subscriptions.assignedPartitions(); if (assignedPartitions.isEmpty()) { // Nothing to revoke. return; } try { SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); droppedPartitions.addAll(assignedPartitions); if (consumerGroupMetadata.get().generationId() > 0) rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions); else rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions); } catch (Exception e) { log.warn("An error occurred releasing the partitions during close()", e); } finally { timer.update(); } applicationEventHandler.addAndGet(new LeaveGroupEvent(calculateDeadlineMs(timer))); log.info("Completed releasing assignment and sending leave group to close consumer"); } catch (TimeoutException e) { log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); } finally { timer.update(); } } ``` Does that seem along the lines of what you envisaged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org