m1a2st commented on code in PR #17072: URL: https://github.com/apache/kafka/pull/17072#discussion_r1741069490
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1140,20 +1154,22 @@ public void testFetchOffsets( if (requireStable) { when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), Review Comment: I think `__consumer_offsets` can replace by `Topic.GROUP_METADATA_TOPIC_NAME`, use constant is better than string ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1222,152 +1255,24 @@ public void testFetchOffsetsWithWrappedError( OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group") + .setGroupId("group"); + if (!fetchAllOffsets) { + request .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") .setPartitionIndexes(Collections.singletonList(0)))); - - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.eq(Duration.ofMillis(5000)), - ArgumentMatchers.any() - )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); - - CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - true - ); - - assertEquals( - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setErrorCode(expectedError.code()), - future.get() - ); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testFetchAllOffsets( - boolean requireStable - ) throws ExecutionException, InterruptedException, TimeoutException { - CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - service.startup(() -> 1); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - - OffsetFetchResponseData.OffsetFetchResponseGroup response = - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() - .setPartitionIndex(0) - .setCommittedOffset(100L))))); - - if (requireStable) { - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-all-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.eq(Duration.ofMillis(5000)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(response)); - } else { - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("fetch-all-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(response)); } - CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - requireStable - ); - - assertEquals(response, future.get(5, TimeUnit.SECONDS)); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testFetchAllOffsetsWhenNotStarted( - boolean requireStable - ) throws ExecutionException, InterruptedException { - CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - - CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - requireStable - ); - - assertEquals( - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), - future.get() - ); - } - - @ParameterizedTest - @CsvSource({ - "UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR", - "NOT_ENOUGH_REPLICAS, NOT_COORDINATOR", - "REQUEST_TIMED_OUT, NOT_COORDINATOR", - "NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR", - "KAFKA_STORAGE_ERROR, NOT_COORDINATOR", - }) - public void testFetchAllOffsetsWithWrappedError( - Errors error, - Errors expectedError - ) throws ExecutionException, InterruptedException { - CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - service.startup(() -> 1); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-all-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), Review Comment: ditto ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1140,20 +1154,22 @@ public void testFetchOffsets( if (requireStable) { when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); } else { when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("fetch-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org