kirktrue commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1792607701
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -173,22 +174,69 @@ private void process(final PollEvent event) {
private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
+ event.future().complete(null);
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ Map<TopicPartition, OffsetAndMetadata> offsets =
event.offsets().isPresent() ?
+ event.offsets().get() : subscriptions.allConsumed();
+ if (offsets.isEmpty()) {
+ event.future().complete(Collections.emptyMap());
+ return;
+ }
+
+ try {
+ maybeUpdateLastSeenEpochIfNewer(offsets);
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Void> future = manager.commitAsync(offsets);
+ future.whenComplete((r, e) -> {
+ if (e != null) {
+ log.error("Committing offsets failed", e);
+ event.future().completeExceptionally(e);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
}
private void process(final SyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
+ event.future().complete(null);
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitSync(event.offsets(),
event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ Map<TopicPartition, OffsetAndMetadata> offsets =
event.offsets().isPresent() ?
+ event.offsets().get() : subscriptions.allConsumed();
+ if (offsets.isEmpty()) {
+ event.future().complete(Collections.emptyMap());
+ return;
+ }
+
+ try {
+ maybeUpdateLastSeenEpochIfNewer(offsets);
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Void> future = manager.commitSync(offsets,
event.deadlineMs());
+ future.whenComplete((r, e) -> {
+ if (e != null) {
+ log.error("Committing offsets failed", e);
+ event.future().completeExceptionally(e);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
+ offsets.forEach((topicPartition, offsetAndMetadata) -> {
+ if (offsetAndMetadata.leaderEpoch().isPresent())
+ metadata.updateLastSeenEpochIfNewer(topicPartition,
offsetAndMetadata.leaderEpoch().get());
+ });
}
Review Comment:
The logic in `ApplicationEventProcessor.maybeUpdateLastSeenEpochIfNewer()`
looks very similar to `AsyncKafkaConsumer.updateLastSeenEpochIfNewer()`. Is it
possible to extract that into a shared method somewhere, say `ConsumerUtils`?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -231,6 +231,130 @@ public void testSubscriptionChangeEvent() {
verify(membershipManager, never()).onConsumerPoll();
}
+ @Test
+ public void testSyncCommitEventWithOffsets() {
+ final long deadlineMs = 12345;
+ TopicPartition tp = new TopicPartition("topic", 0);
+ Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+ SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets),
deadlineMs);
+
+ setupProcessor(true);
+ doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
deadlineMs);
Review Comment:
Sorry to be daft, but can you explain why we'd want to complete the `Future`
with `null` instead of result? I'm confused how the call to
`event.future.get()` ends up with the correct value 🤔
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -281,8 +281,9 @@ public void testCommitAsyncWithNullCallback() {
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
- assertEquals(offsets, commitEvent.offsets());
- assertDoesNotThrow(() -> commitEvent.future().complete(null));
+ assertTrue(commitEvent.offsets().isPresent());
+ assertEquals(offsets, commitEvent.offsets().get());
+ assertDoesNotThrow(() -> commitEvent.future().complete(offsets));
Review Comment:
I know this line is in the original code, but I have a question: in what
case would calling `CompletableFuture.complete()` possibly throw an exception?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -640,8 +639,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
MockCommitCallback callback = new MockCommitCallback();
assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets,
callback));
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
Review Comment:
Same question here as above.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -604,8 +605,6 @@ public void testCommitSyncLeaderEpochUpdate() {
assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
Review Comment:
Out of curiosity, why don't we want to verify the metadata was updated? That
still happens in the `ApplicationEventProcessor`, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]