m1a2st commented on code in PR #17072:
URL: https://github.com/apache/kafka/pull/17072#discussion_r1741069490


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1140,20 +1154,22 @@ public void testFetchOffsets(
 
         if (requireStable) {
             when(runtime.scheduleWriteOperation(
-                ArgumentMatchers.eq("fetch-offsets"),
+                ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : 
"fetch-offsets"),
                 ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),

Review Comment:
   I think `__consumer_offsets` can replace by 
`Topic.GROUP_METADATA_TOPIC_NAME`, use constant is better than string



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1222,152 +1255,24 @@ public void testFetchOffsetsWithWrappedError(
 
         OffsetFetchRequestData.OffsetFetchRequestGroup request =
             new OffsetFetchRequestData.OffsetFetchRequestGroup()
-                .setGroupId("group")
+                .setGroupId("group");
+        if (!fetchAllOffsets) {
+            request
                 .setTopics(Collections.singletonList(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
                     .setName("foo")
                     .setPartitionIndexes(Collections.singletonList(0))));
-
-        when(runtime.scheduleWriteOperation(
-            ArgumentMatchers.eq("fetch-offsets"),
-            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
-            ArgumentMatchers.eq(Duration.ofMillis(5000)),
-            ArgumentMatchers.any()
-        )).thenReturn(FutureUtils.failedFuture(new 
CompletionException(error.exception())));
-
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchOffsets(
-            requestContext(ApiKeys.OFFSET_FETCH),
-            request,
-            true
-        );
-
-        assertEquals(
-            new OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId("group")
-                .setErrorCode(expectedError.code()),
-            future.get()
-        );
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testFetchAllOffsets(
-        boolean requireStable
-    ) throws ExecutionException, InterruptedException, TimeoutException {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorService(
-            new LogContext(),
-            createConfig(),
-            runtime,
-            new GroupCoordinatorMetrics(),
-            createConfigManager()
-        );
-
-        service.startup(() -> 1);
-
-        OffsetFetchRequestData.OffsetFetchRequestGroup request =
-            new OffsetFetchRequestData.OffsetFetchRequestGroup()
-                .setGroupId("group");
-
-        OffsetFetchResponseData.OffsetFetchResponseGroup response =
-            new OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId("group")
-                .setTopics(Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponseTopics()
-                    .setName("foo")
-                    .setPartitions(Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
-                        .setPartitionIndex(0)
-                        .setCommittedOffset(100L)))));
-
-        if (requireStable) {
-            when(runtime.scheduleWriteOperation(
-                ArgumentMatchers.eq("fetch-all-offsets"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
-                ArgumentMatchers.eq(Duration.ofMillis(5000)),
-                ArgumentMatchers.any()
-            )).thenReturn(CompletableFuture.completedFuture(response));
-        } else {
-            when(runtime.scheduleReadOperation(
-                ArgumentMatchers.eq("fetch-all-offsets"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
-                ArgumentMatchers.any()
-            )).thenReturn(CompletableFuture.completedFuture(response));
         }
 
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchAllOffsets(
-            requestContext(ApiKeys.OFFSET_FETCH),
-            request,
-            requireStable
-        );
-
-        assertEquals(response, future.get(5, TimeUnit.SECONDS));
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testFetchAllOffsetsWhenNotStarted(
-        boolean requireStable
-    ) throws ExecutionException, InterruptedException {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorService(
-            new LogContext(),
-            createConfig(),
-            runtime,
-            new GroupCoordinatorMetrics(),
-            createConfigManager()
-        );
-
-        OffsetFetchRequestData.OffsetFetchRequestGroup request =
-            new OffsetFetchRequestData.OffsetFetchRequestGroup()
-                .setGroupId("group");
-
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchAllOffsets(
-            requestContext(ApiKeys.OFFSET_FETCH),
-            request,
-            requireStable
-        );
-
-        assertEquals(
-            new OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId("group")
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
-            future.get()
-        );
-    }
-
-    @ParameterizedTest
-    @CsvSource({
-        "UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
-        "NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
-        "REQUEST_TIMED_OUT, NOT_COORDINATOR",
-        "NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
-        "KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
-    })
-    public void testFetchAllOffsetsWithWrappedError(
-        Errors error,
-        Errors expectedError
-    ) throws ExecutionException, InterruptedException {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorService(
-            new LogContext(),
-            createConfig(),
-            runtime,
-            new GroupCoordinatorMetrics(),
-            createConfigManager()
-        );
-
-        service.startup(() -> 1);
-
-        OffsetFetchRequestData.OffsetFetchRequestGroup request =
-            new OffsetFetchRequestData.OffsetFetchRequestGroup()
-                .setGroupId("group");
-
         when(runtime.scheduleWriteOperation(
-            ArgumentMatchers.eq("fetch-all-offsets"),
+            ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : 
"fetch-offsets"),
             ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),

Review Comment:
   ditto



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1140,20 +1154,22 @@ public void testFetchOffsets(
 
         if (requireStable) {
             when(runtime.scheduleWriteOperation(
-                ArgumentMatchers.eq("fetch-offsets"),
+                ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : 
"fetch-offsets"),
                 ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
                 ArgumentMatchers.eq(Duration.ofMillis(5000)),
                 ArgumentMatchers.any()
             )).thenReturn(CompletableFuture.completedFuture(response));
         } else {
             when(runtime.scheduleReadOperation(
-                ArgumentMatchers.eq("fetch-offsets"),
+                ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : 
"fetch-offsets"),
                 ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to