m1a2st commented on code in PR #19773:
URL: https://github.com/apache/kafka/pull/19773#discussion_r2108531921


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {

Review Comment:
   Please use `GroupProtocol` instead of `String`
   ```suggestion
       private void testAssignAndCommitSyncNotCommitted(GroupProtocol 
groupProtocol) {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java:
##########
@@ -84,6 +85,15 @@ public static void consumeAndVerifyRecords(
         );
     }
 
+    public static void pollUntilTrue(Consumer<byte[], byte[]> consumer,
+                                     Supplier<Boolean> testCondition,
+                                     long waitTimeMs, String msg) throws 
InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100));
+            return testCondition.get();
+        }, waitTimeMs, msg);
+    }

Review Comment:
   Please follow the code style used in this file.
   ```suggestion
       public static void pollUntilTrue(
           Consumer<byte[], byte[]> consumer,
           Supplier<Boolean> testCondition,
           long waitTimeMs, 
           String msg
       ) throws InterruptedException {
           TestUtils.waitForCondition(() -> {
               consumer.poll(Duration.ofMillis(100));
               return testCondition.get();
           }, waitTimeMs, msg);
       }
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {

Review Comment:
   ```suggestion
       private void testAssignAndConsume(GroupProtocol groupProtocol) throws 
InterruptedException {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            // First consumer consumes and commits offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            anotherConsumer.assign(List.of(tp));
+            assertEquals(numRecords, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeFromCommittedOffsets(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        int offset = 10;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset)));
+            assertEquals(offset, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            assertEquals(offset, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            anotherConsumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, 
numRecords - offset, offset, offset, startingTimestamp + offset);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {

Review Comment:
   Does this test require manually setting the `GROUP_ID_CONFIG`?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            // First consumer consumes and commits offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            anotherConsumer.assign(List.of(tp));
+            assertEquals(numRecords, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeFromCommittedOffsets(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        int offset = 10;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {

Review Comment:
   How about extract the same config as a variable.
   ```Java
   val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, 
"group1");
   ```
   



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {

Review Comment:
   How about extract the same config as a variable.
   ```Java
   val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, 
"group1");
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {

Review Comment:
   ```suggestion
       private void testAssignAndCommitSyncAllConsumed(GroupProtocol 
groupProtocol) throws InterruptedException {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            // First consumer consumes and commits offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            anotherConsumer.assign(List.of(tp));
+            assertEquals(numRecords, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeFromCommittedOffsets(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        int offset = 10;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset)));
+            assertEquals(offset, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            assertEquals(offset, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            anotherConsumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, 
numRecords - offset, offset, offset, startingTimestamp + offset);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+
+            // Consume and commit offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+
+            // Check committed offsets twice with same consumer
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());

Review Comment:
   Just curious, Why do we need to check the same consumer committed twice?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {

Review Comment:
   ```suggestion
       private void testAssignAndFetchCommittedOffsets(GroupProtocol 
groupProtocol) throws InterruptedException {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {

Review Comment:
   ```suggestion
       private void testAssignAndConsumeSkippingPosition(GroupProtocol 
groupProtocol) throws InterruptedException {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            // First consumer consumes and commits offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            anotherConsumer.assign(List.of(tp));
+            assertEquals(numRecords, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeFromCommittedOffsets(String 
groupProtocol) throws InterruptedException {
+        int numRecords = 100;
+        int offset = 10;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset)));
+            assertEquals(offset, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            assertEquals(offset, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            anotherConsumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, 
numRecords - offset, offset, offset, startingTimestamp + offset);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() 
throws InterruptedException {
+        
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String 
groupProtocol) throws InterruptedException {

Review Comment:
   ```suggestion
       private void 
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol 
groupProtocol) throws InterruptedException {
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerAssignTest {
+
+    public static final int BROKER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+    private final String topic = "topic";
+    private final int partition = 0;
+    TopicPartition tp = new TopicPartition(topic, partition);
+
+    PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception 
{
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
+        testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+        CountConsumerCommitCallback cb = new CountConsumerCommitCallback();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitAsync(cb);
+            ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 
1 || cb.lastError.isPresent(),
+                    10000, "Failed to observe commit callback before timeout");
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncNotCommitted() {
+        testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
+        int numRecords = 10000;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            // No valid fetch position due to the absence of consumer.poll; 
and therefore no offset was committed to
+            // tp. The committed offset should be null. This is intentional.
+            assertNull(committedOffset.get(tp));
+            assertTrue(consumer.assignment().contains(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
+        testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndCommitSyncAllConsumed(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10000;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            consumer.commitSync();
+            Map<TopicPartition, OffsetAndMetadata> committedOffset = 
consumer.committed(Set.of(tp));
+            assertNotNull(committedOffset);
+            assertNotNull(committedOffset.get(tp));
+            assertEquals(numRecords, committedOffset.get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsume() throws InterruptedException {
+        testAssignAndConsume(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsume(String groupProtocol) throws 
InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeSkippingPosition() throws 
InterruptedException {
+        testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeSkippingPosition(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 10;
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
+            long startingTimestamp = System.currentTimeMillis();
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            int offset = 1;
+            consumer.seek(tp, offset);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords 
- offset, offset, offset, startingTimestamp + offset);
+
+            assertEquals(numRecords, consumer.position(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndFetchCommittedOffsets() throws 
InterruptedException {
+        testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndFetchCommittedOffsets(String groupProtocol) 
throws InterruptedException {
+        int numRecords = 100;
+        long startingTimestamp = System.currentTimeMillis();
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, 
startingTimestamp);
+            consumer.assign(List.of(tp));
+            // First consumer consumes and commits offsets
+            consumer.seek(tp, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 
0, 0, startingTimestamp);
+            consumer.commitSync();
+            assertEquals(numRecords, 
consumer.committed(Set.of(tp)).get(tp).offset());
+        }
+
+        // We should see the committed offsets from another consumer
+        try (Consumer<byte[], byte[]> anotherConsumer = 
clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, 
GROUP_ID_CONFIG, "group1"))) {
+            anotherConsumer.assign(List.of(tp));
+            assertEquals(numRecords, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
+    }
+
+    @ClusterTest
+    public void testAsyncAssignAndConsumeFromCommittedOffsets() throws 
InterruptedException {
+        testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
+    }
+
+    private void testAssignAndConsumeFromCommittedOffsets(String 
groupProtocol) throws InterruptedException {

Review Comment:
   ```suggestion
       private void testAssignAndConsumeFromCommittedOffsets(GroupProtocol 
groupProtocol) throws InterruptedException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to