lucasbru commented on code in PR #14680: URL: https://github.com/apache/kafka/pull/14680#discussion_r1387628672
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -278,7 +286,8 @@ public PrototypeAsyncConsumer(final Time time, } } - public PrototypeAsyncConsumer(LogContext logContext, + // Visible for testing + PrototypeAsyncConsumer(LogContext logContext, Review Comment: You will have to fix the indentation of the other parameters. ########## core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala: ########## @@ -16,7 +16,7 @@ */ package kafka.api -import kafka.utils.TestUtils.waitUntilTrue +import kafka.utils.TestUtils.{waitUntilTrue} Review Comment: Do we need curly brackets? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -381,21 +391,28 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { CompletableFuture<Void> future = commit(offsets, false); - final OffsetCommitCallback commitCallback = callback == null ? new DefaultOffsetCommitCallback() : callback; future.whenComplete((r, t) -> { - if (t != null) { - commitCallback.onComplete(offsets, new KafkaException(t)); - } else { - commitCallback.onComplete(offsets, null); + if (callback == null) { + if (t != null) { + log.error("Offset commit with offsets {} failed", offsets, t); + } + return; } - }).exceptionally(e -> { - throw new KafkaException(e); + + invoker.submit(new OffsetCommitCallbackTask(callback, offsets, (Exception) t)); }); } // Visible for testing CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets, final boolean isWakeupable) { Review Comment: nit: some parameters final, some non-final ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { Review Comment: nit: how about `maybeInvokeCommitCallbacks`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread to register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ + private class OffsetCommitCallbackInvoker { + // Thread-safe queue to store callbacks + private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); + + public void submit(final OffsetCommitCallbackTask callback) { + try { + callbackQueue.offer(callback); + } catch (Exception e) { + log.error("Failed to enqueue OffsetCommitCallback", e); + } + } + + public void executeCallbacks() { + LinkedList<OffsetCommitCallbackTask> callbacks = new LinkedList<>(); + callbackQueue.drainTo(callbacks); + while (!callbacks.isEmpty()) { + OffsetCommitCallbackTask callback = callbacks.poll(); + if (callback != null) { + callback.invoke(); + } + } + } + } + + private class OffsetCommitCallbackTask { Review Comment: Should be a static class. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -712,6 +729,10 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(timeout), "Failed to close application event handler with a timeout(ms)=" + timeout, firstException); + // Invoke all callbacks after the background thread exists in case if there are unsent async + // commits + maybeInvokeCallbacks(); Review Comment: Why do we not check for fenced exceptions here? The original consumer attempts to complete all in-flight async commit requests within `timeout` ms. Will closing the background thread take care of this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -147,6 +151,10 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); + private volatile boolean isFenced = false; Review Comment: I'm wondering why we need this member. It looks like the volatile boolean is written and read only from the application thread. Could it then just be a boolean? But if so, could we just throw the exception directly and avoid the indirection? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -381,21 +391,28 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { CompletableFuture<Void> future = commit(offsets, false); - final OffsetCommitCallback commitCallback = callback == null ? new DefaultOffsetCommitCallback() : callback; future.whenComplete((r, t) -> { - if (t != null) { - commitCallback.onComplete(offsets, new KafkaException(t)); - } else { - commitCallback.onComplete(offsets, null); + if (callback == null) { + if (t != null) { + log.error("Offset commit with offsets {} failed", offsets, t); + } + return; } - }).exceptionally(e -> { - throw new KafkaException(e); + + invoker.submit(new OffsetCommitCallbackTask(callback, offsets, (Exception) t)); }); } // Visible for testing CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets, final boolean isWakeupable) { + maybeThrowFencedInstanceException(); + maybeInvokeCallbacks(); maybeThrowInvalidGroupIdException(); + + if (offsets.isEmpty()) { + return CompletableFuture.completedFuture(null); + } Review Comment: Maybe not related to this concrete PR, but the original commit code has some logic around updating the last seen leader epochs here, can we omit it, or is it a follow-up task? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread to register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ + private class OffsetCommitCallbackInvoker { + // Thread-safe queue to store callbacks + private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); + + public void submit(final OffsetCommitCallbackTask callback) { + try { + callbackQueue.offer(callback); + } catch (Exception e) { + log.error("Failed to enqueue OffsetCommitCallback", e); + } + } + + public void executeCallbacks() { + LinkedList<OffsetCommitCallbackTask> callbacks = new LinkedList<>(); Review Comment: Why do we need to create a copy of the queue here? Are we concerned about consumer invocations inside the callback? Then maybe a put a comment here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread to register a {@link OffsetCommitCallbackTask} to the invoker upon the Review Comment: ```suggestion * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1010,15 +1031,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } - private class DefaultOffsetCommitCallback implements OffsetCommitCallback { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Offset commit with offsets {} failed", offsets, exception); - } - } - boolean updateAssignmentMetadataIfNeeded(Timer timer) { + maybeInvokeCallbacks(); Review Comment: Sometimes we check for the fencedInstance first, sometimes we invoke the callbacks first. I think in a previous version of the PR and in the original consumer we invoke at the beginning of `poll`. Why did you move this into `updateAssignmentMetadataIfNeeded`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread to register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ + private class OffsetCommitCallbackInvoker { Review Comment: Should be a static class. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } -} \ No newline at end of file + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread to register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ + private class OffsetCommitCallbackInvoker { + // Thread-safe queue to store callbacks + private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); + + public void submit(final OffsetCommitCallbackTask callback) { + try { + callbackQueue.offer(callback); + } catch (Exception e) { + log.error("Failed to enqueue OffsetCommitCallback", e); Review Comment: Why do we expect an exception inside offer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org