lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1777058782
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -162,23 +163,44 @@ private void process(final PollEvent event) {
}
private void process(final AsyncCommitEvent event) {
- if (!requestManagers.commitRequestManager.isPresent()) {
- return;
- }
-
- CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> future = manager.commitAsync(event.offsets());
- future.whenComplete(complete(event.future()));
+ process((CommitEvent) event);
Review Comment:
We have 2 separate event types (async and sync), to then join them together
in one here, to then split them again for the actual process with:
```
if (event.type() == Type.COMMIT_ASYNC) {
future = manager.commitAsync(offsets);
} else {
future = manager.commitSync(offsets, event.deadlineMs());
}
```
I get that with this we can reuse a bit but wonder if it's worth the twisted
flow. Could we maybe keep them separate (as they originally are when the events
are created), then process(Sync) that ends up calling the mgr.commitSync, and
process(Async) calling manager.commitAsync, and just encapsulate in funcs what
we want to reuse in both? (ex. maybeUpdateLastSeenEpochIfNewer() with lines
188-191 that would be called from both, similar for the logic to retrieve
offsets from the event, ln 180-181). What do you think?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSyncCommitEventWithOffsets(boolean withGroupId) {
Review Comment:
since this is testing commit it does need a group id, so it should be only
for withGroupId=true I expect
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSyncCommitEventWithOffsets(boolean withGroupId) {
+ final long deadlineMs = 12345;
+ TopicPartition tp = new TopicPartition("topic", 0);
+ Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+ SyncCommitEvent event = new SyncCommitEvent(offsets, false,
deadlineMs);
+
+ setupProcessor(withGroupId);
+ if (withGroupId) {
+ doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
deadlineMs);
+ }
+
+ processor.process(event);
+ verify(subscriptionState, never()).allConsumed();
+ if (withGroupId) {
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ verify(commitRequestManager).commitSync(offsets, deadlineMs);
+ } else {
+ verify(metadata, never()).updateLastSeenEpochIfNewer(tp, 1);
+ verify(commitRequestManager, never()).commitSync(offsets,
deadlineMs);
+ }
+ assertDoesNotThrow(() -> event.future().get());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSyncCommitEventWithCommitAllConsumed(boolean withGroupId) {
Review Comment:
same, only relevant for withGroupId=true right? (and the all the other
commit tests down below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]