chia7712 commented on code in PR #16876:
URL: https://github.com/apache/kafka/pull/16876#discussion_r1721174887


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -336,9 +336,8 @@ private CompletableFuture<ListOffsetResult> 
buildListOffsetRequestToNode(
             Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> 
targetTimes,
             boolean requireTimestamps,
             List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
-        ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
-                .forConsumer(requireTimestamps, isolationLevel, false, false)
-                
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
+        List<ListOffsetsRequestData.ListOffsetsTopic> topics = 
ListOffsetsRequest.toListOffsetsTopics(targetTimes);

Review Comment:
   If there is no strong reason, could we keep fluent pattern? 
   ```java
           ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
                   .forConsumer(requireTimestamps, isolationLevel)
                   
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
   ```



##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -288,8 +288,8 @@ class RequestQuotaTest extends BaseRequestTest {
               .setPartitionIndex(tp.partition)
               .setTimestamp(0L)
               .setCurrentLeaderEpoch(15)).asJava)
-          ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false)
-            .setTargetTimes(List(topic).asJava)
+          val targetTimes = List(topic).asJava

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2365,9 +2366,10 @@ private ListOffsetsRequest createListOffsetRequest(short 
version) {
                             .setPartitionIndex(0)
                             .setTimestamp(1000000L)
                             .setCurrentLeaderEpoch(5)));
+            List<ListOffsetsTopic> topics = singletonList(topic);
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, 
false)
-                    .setTargetTimes(singletonList(topic))
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .setTargetTimes(topics)

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/LogOffsetTest.scala:
##########
@@ -60,8 +60,9 @@ class LogOffsetTest extends BaseRequestTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
-    val request = ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false)
-      .setTargetTimes(buildTargetTimes(topicPartition, 
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
+    val targetTimes = buildTargetTimes(topicPartition, 
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2378,9 +2380,10 @@ private ListOffsetsRequest createListOffsetRequest(short 
version) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(singletonList(partition));
+            List<ListOffsetsTopic> topics = singletonList(topic);
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false, 
false)
-                    .setTargetTimes(singletonList(topic))
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .setTargetTimes(topics)

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2354,9 +2354,10 @@ private ListOffsetsRequest createListOffsetRequest(short 
version) {
                             .setTimestamp(1000000L)
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
+            List<ListOffsetsTopic> topics = singletonList(topic);
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, 
false, false)
-                    .setTargetTimes(singletonList(topic))
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .setTargetTimes(topics)

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java:
##########
@@ -68,11 +76,11 @@ public void testGetErrorResponse() {
                                 new ListOffsetsPartition()
                                     .setPartitionIndex(0))));
             ListOffsetsRequest request = ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false, 
false)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
                     .setTargetTimes(topics)
                     .build(version);
             ListOffsetsResponse response = (ListOffsetsResponse) 
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
-    

Review Comment:
   please revert this unrelated change



##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -102,6 +110,7 @@ public ListOffsetsRequest build(short version) {
         public String toString() {
             return data.toString();
         }
+

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -287,15 +287,15 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   private def createListOffsetsRequest = {
-    requests.ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
-      List(new ListOffsetsTopic()
-        .setName(tp.topic)
-        .setPartitions(List(new ListOffsetsPartition()
-          .setPartitionIndex(tp.partition)
-          .setTimestamp(0L)
-          .setCurrentLeaderEpoch(27)).asJava)).asJava
-      ).
-      build()
+    val topics = List(new ListOffsetsTopic()

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to