lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1828337700
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent
event) {
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ future.whenComplete((offsets, throwable) -> {
+ if (throwable != null) {
+ log.error("Committing offsets failed", throwable);
+ event.future().completeExceptionally(throwable);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
}
private void process(final SyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitSync(event.offsets(),
event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(event.offsets(), event.deadlineMs());
+ future.whenComplete((offsets, throwable) -> {
+ if (throwable != null) {
+ log.error("Committing offsets failed", throwable);
+ event.future().completeExceptionally(throwable);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
Review Comment:
could we simplify this to `future.whenComplete(complete(event.future()))` ?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent
event) {
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ future.whenComplete((offsets, throwable) -> {
+ if (throwable != null) {
+ log.error("Committing offsets failed", throwable);
+ event.future().completeExceptionally(throwable);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
}
private void process(final SyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
Review Comment:
completeExceptionally?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java:
##########
@@ -20,13 +20,15 @@
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
+import java.util.Optional;
/**
* Event to commit offsets without waiting for a response, so the request
won't be retried.
+ * If no offsets are provided, this event will commit all offsets.
Review Comment:
nit: all **consumed** offsets
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -274,13 +278,146 @@ public void testPollEnsureEmptyPendingRequestAfterPoll()
{
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1,
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
assertEmptyPendingRequests(commitRequestManager);
}
+ @Test
+ public void testCommitSync() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(true, 100);
Review Comment:
we don't need autoCommit enabled for this test, so even if it may be
harmless maybe better to not mix it in to avoid confusion?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent
event) {
private void process(final AsyncCommitEvent event) {
Review Comment:
I see, and I would say it's fine if we move it to the `commitMgr` as you
did, as long as we update metadata before completing the result futures that
the mgr returns
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1676,7 +1666,8 @@ public void testEnsurePollEventSentOnConsumerPoll() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
- "client-id");
+ "client-id",
+ false);
Review Comment:
is indentation off here? (actually I think the "false" is the one that's ok,
with 4 spaces, vs all the others with 8. I think we usually do 4 here)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent
event) {
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
Review Comment:
Should we better complete exceptionally? I expect this would only happen
during development phase if a bug is introduced, but still, it would be easier
to catch if we make the app event fail because the expected manager is not
present. I notice there's no consistency in how we handle these (some process
calls complete exceptionally, others just return/complete, but I would lean
towards not swallowing the unlikely/unexpected error here)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -596,8 +586,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
MockCommitCallback callback = new MockCommitCallback();
assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets,
callback));
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
Review Comment:
as above, agree with removing this as the `updateLastSeenEpochIfNewer` is
now done by the `commitReqMgr`, but then this test looses it's value, we should
remove it (coverage replaced in the CommitReqMgrTest)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java:
##########
@@ -20,14 +20,15 @@
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
+import java.util.Optional;
/**
* Event to commit offsets waiting for a response and retrying on expected
retriable errors until
- * the timer expires.
+ * the timer expires. If no offsets are provided, this event will commit all
offsets.
Review Comment:
nit: ...all **consumed** offsets
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent
event) {
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
+ if (requestManagers.commitRequestManager.isEmpty()) {
+ event.future().complete(Map.of());
return;
}
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ try {
+ CommitRequestManager manager =
requestManagers.commitRequestManager.get();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ future.whenComplete((offsets, throwable) -> {
+ if (throwable != null) {
+ log.error("Committing offsets failed", throwable);
+ event.future().completeExceptionally(throwable);
+ } else {
+ event.future().complete(offsets);
+ }
+ });
Review Comment:
could we simplify this to `future.whenComplete(complete(event.future()))`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -225,7 +249,13 @@ private void process(final FetchCommittedOffsetsEvent
event) {
}
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.fetchOffsets(event.partitions(), event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ future.whenComplete((value, exception) -> {
+ if (exception != null)
+ event.future().completeExceptionally(exception);
+ else {
+ event.future().complete(value);
+ }
+ });
Review Comment:
why not reusing `future.whenComplete(complete(event.future()))` like before?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -274,13 +278,146 @@ public void testPollEnsureEmptyPendingRequestAfterPoll()
{
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- commitRequestManager.commitAsync(offsets);
+ commitRequestManager.commitAsync(Optional.of(offsets));
assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1,
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
assertEmptyPendingRequests(commitRequestManager);
}
+ @Test
+ public void testCommitSync() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(true, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+ "topic",
+ 1,
+ (short) 1,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitSyncWithEmptyOffsets() {
+ subscriptionState = mock(SubscriptionState.class);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ TopicPartition tp = new TopicPartition("topic", 1);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+ doReturn(offsets).when(subscriptionState).allConsumed();
+
+ CommitRequestManager commitRequestManager = create(true, 100);
Review Comment:
ditto
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -439,14 +450,15 @@ private OffsetCommitRequestState
createOffsetCommitRequest(final Map<TopicPartit
}
private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
- CompletableFuture<Void> result) {
+ CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> result) {
pendingRequests.addOffsetCommitRequest(requestAttempt);
// Retry the same commit request while it fails with
RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
- result.complete(null);
+ result.complete(requestAttempt.offsets);
+ maybeUpdateLastSeenEpochIfNewer(requestAttempt.offsets);
Review Comment:
ditto
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -381,20 +386,22 @@ private void
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
* exceptionally depending on the response. If the request fails with a
retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
- public CompletableFuture<Void> commitAsync(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
- if (offsets.isEmpty()) {
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
+ if (commitOffsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(Map.of());
}
- OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
+ OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
- CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
asyncCommitResult = new CompletableFuture<>();
commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
- asyncCommitResult.complete(null);
+ asyncCommitResult.complete(commitOffsets);
+ maybeUpdateLastSeenEpochIfNewer(commitOffsets);
Review Comment:
this should probably go before completing the "asyncCommitResult" future, to
ensure that we don't signal the caller that the commit completed if we haven't
updated the metadata yet.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -604,8 +605,6 @@ public void testCommitSyncLeaderEpochUpdate() {
assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
- verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
- verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
Review Comment:
agree with removing the checks on metadata, but then I think we should
remove the `testCommitSyncLeaderEpochUpdate` completely. The metadata update is
not done in this component anymore, so this ends up simply testing commitSync,
which is already covered. (coverage for the leader epoch update is now in the
commitReqMgr, which does the metadata update)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -532,6 +544,7 @@ private void fetchOffsetsWithRetries(final
OffsetFetchRequestState fetchRequest,
}
if (error == null) {
result.complete(res);
+ maybeUpdateLastSeenEpochIfNewer(res);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]