AndrewJSchofield commented on code in PR #20909:
URL: https://github.com/apache/kafka/pull/20909#discussion_r2539709848


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3447,6 +3730,37 @@ private void alterShareIsolationLevel(String groupId, 
String newValue) {
         }
     }
 
+    private List<Integer> topicPartitionLeader(Admin adminClient, String 
topicName, int partition)
+        throws InterruptedException, ExecutionException {
+        return 
adminClient.describeTopics(List.of(topicName)).allTopicNames().get().get(topicName)
+            .partitions().stream()
+            .filter(info -> info.partition() == partition)
+            .map(info -> info.leader().id())
+            .filter(info -> info != -1)
+            .toList();
+    }
+
+    private SharePartitionOffsetInfo sharePartitionDescription(Admin 
adminClient, String groupId, TopicPartition tp)

Review Comment:
   nit: Method name seems wrong. It's returning the share-partition offset 
info, not the description.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3447,6 +3730,37 @@ private void alterShareIsolationLevel(String groupId, 
String newValue) {
         }
     }
 
+    private List<Integer> topicPartitionLeader(Admin adminClient, String 
topicName, int partition)
+        throws InterruptedException, ExecutionException {

Review Comment:
   nit: The indentation on this method and the next is a bit odd because the 
throws and the start of the method body are equally indented. I'd just put the 
throws on the same line as the argument list for these methods.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String 
filterString, int count) {
         assertEquals(count, ((Meter) renewAck).count());
     }
 
+    @ClusterTest
+    public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+        String groupId = "group1";
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created.
+            shareConsumer.poll(Duration.ofMillis(2000));
+            SharePartitionOffsetInfo sharePartitionDescription = 
sharePartitionDescription(adminClient, groupId, tp);
+            // Since the partition is empty, and no records have been 
consumed, the share partition startOffset will be
+            // -1. Thus, there will be no description for the share partition.
+            assertNull(sharePartitionDescription);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForSingleShareConsumer() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+            Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created and teh record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForMultipleShareConsumers() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(groupId);
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            producer.flush();
+            shareConsumer1.subscribe(List.of(tp.topic()));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Polling share consumer 1 to make sure the share partition in 
created and the records are consumed.
+            waitedPoll(shareConsumer1, 2500L, 1);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer1.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing more records to the share partition.
+            producer.send(record);
+            // Polling share consumer 2 this time.
+            waitedPoll(shareConsumer2, 2500L, 1);
+            // Since the consumed record hasn't been acknowledged yet, the 
share partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer2.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithReleaseAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now release the record - it should be available for redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+            // After releasing the lag should be 1, because the record is 
released for redelivery.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // The record is now consumed again.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record to mark it as consumed.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting the record, the lag should be 0 because all the 
produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithRejectAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now reject the record - it should not be available for 
redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.REJECT));
+            shareConsumer.commitSync();
+            // After rejecting the lag should be 0, because the record is 
permanently rejected and offset moves forward.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3")
+        }
+    )
+    public void testSharePartitionLagOnGroupCoordinatorMovement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            String topicName = "testTopicWithReplicas";
+            // Create a topic with replication factor 3
+            Uuid tpId = createTopic(topicName, 1, 3);
+            TopicPartition tp = new TopicPartition(topicName, 0);
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Produce first record and consume it
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(tpId, tp));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+            List<Integer> curGroupCoordNodeId;
+            // Find the broker which is the group coordinator for the share 
group.
+            curGroupCoordNodeId = topicPartitionLeader(adminClient, 
Topic.GROUP_METADATA_TOPIC_NAME, consumerOffsetsTp);
+            assertEquals(1, curGroupCoordNodeId.size());
+            // Shut down the coordinator broker
+            KafkaBroker broker = 
cluster.brokers().get(curGroupCoordNodeId.get(0));
+            cluster.shutdownBroker(curGroupCoordNodeId.get(0));
+            // Wait for it to be completely shutdown
+            broker.awaitShutdown();
+            // Wait for the leaders of share coordinator, group coordinator 
and topic partition to be elected, if needed, on a different broker.
+            TestUtils.waitForCondition(() -> {
+                List<Integer> newShareCoordNodeId = 
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 
shareGroupStateTp);
+                List<Integer> newGroupCoordNodeId = 
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 
consumerOffsetsTp);
+                List<Integer> newTopicPartitionLeader = 
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+                return newShareCoordNodeId.size() == 1 && 
!Objects.equals(newShareCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+                    newGroupCoordNodeId.size() == 1 && 
!Objects.equals(newGroupCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+                    newTopicPartitionLeader.size() == 1 && 
!Objects.equals(newTopicPartitionLeader.get(0), curGroupCoordNodeId.get(0));
+            }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
elect new leaders after broker shutdown");
+            // After group coordinator shutdown, check that lag is still 1
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3")
+        }
+    )
+    public void testSharePartitionLagOnShareCoordinatorMovement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            String topicName = "testTopicWithReplicas";
+            // Create a topic with replication factor 3
+            Uuid tpId = createTopic(topicName, 1, 3);
+            TopicPartition tp = new TopicPartition(topicName, 0);
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Produce first record and consume it
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(tpId, tp));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+            List<Integer> curShareCoordNodeId;
+            // Find the broker which is the share coordinator for the share 
partition.
+            curShareCoordNodeId = topicPartitionLeader(adminClient, 
Topic.SHARE_GROUP_STATE_TOPIC_NAME, shareGroupStateTp);
+            assertEquals(1, curShareCoordNodeId.size());
+            // Shut down the coordinator broker
+            KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
+            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+            // Wait for it to be completely shutdown
+            broker.awaitShutdown();
+            // Wait for the leaders of share coordinator, group coordinator 
and topic partition to be elected, if needed, on a different broker.
+            TestUtils.waitForCondition(() -> {
+                List<Integer> newShareCoordNodeId = 
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 
shareGroupStateTp);
+                List<Integer> newGroupCoordNodeId = 
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 
consumerOffsetsTp);
+                List<Integer> newTopicPartitionLeader = 
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+                return newShareCoordNodeId.size() == 1 && 
!Objects.equals(newShareCoordNodeId.get(0), curShareCoordNodeId.get(0)) &&
+                    newGroupCoordNodeId.size() == 1 && 
!Objects.equals(newGroupCoordNodeId.get(0), curShareCoordNodeId.get(0)) &&
+                    newTopicPartitionLeader.size() == 1 && 
!Objects.equals(newTopicPartitionLeader.get(0), curShareCoordNodeId.get(0));
+            }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
elect new leaders after broker shutdown");
+            // After share coordinator shutdown and new leaderS election, 
check that lag is still 1

Review Comment:
   nit: new "leader's" not "leaderS"



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String 
filterString, int count) {
         assertEquals(count, ((Meter) renewAck).count());
     }
 
+    @ClusterTest
+    public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+        String groupId = "group1";
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created.
+            shareConsumer.poll(Duration.ofMillis(2000));
+            SharePartitionOffsetInfo sharePartitionDescription = 
sharePartitionDescription(adminClient, groupId, tp);
+            // Since the partition is empty, and no records have been 
consumed, the share partition startOffset will be
+            // -1. Thus, there will be no description for the share partition.
+            assertNull(sharePartitionDescription);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForSingleShareConsumer() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+            Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created and teh record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForMultipleShareConsumers() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(groupId);
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            producer.flush();
+            shareConsumer1.subscribe(List.of(tp.topic()));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Polling share consumer 1 to make sure the share partition in 
created and the records are consumed.
+            waitedPoll(shareConsumer1, 2500L, 1);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer1.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing more records to the share partition.
+            producer.send(record);
+            // Polling share consumer 2 this time.
+            waitedPoll(shareConsumer2, 2500L, 1);
+            // Since the consumed record hasn't been acknowledged yet, the 
share partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer2.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithReleaseAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now release the record - it should be available for redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+            // After releasing the lag should be 1, because the record is 
released for redelivery.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // The record is now consumed again.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record to mark it as consumed.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting the record, the lag should be 0 because all the 
produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithRejectAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now reject the record - it should not be available for 
redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.REJECT));
+            shareConsumer.commitSync();
+            // After rejecting the lag should be 0, because the record is 
permanently rejected and offset moves forward.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3")

Review Comment:
   I read the code from the bottom so my comment about not needing 3 partitions 
for these internal topics stands here too. You do need replication factor 3.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String 
filterString, int count) {
         assertEquals(count, ((Meter) renewAck).count());
     }
 
+    @ClusterTest
+    public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+        String groupId = "group1";
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created.
+            shareConsumer.poll(Duration.ofMillis(2000));
+            SharePartitionOffsetInfo sharePartitionDescription = 
sharePartitionDescription(adminClient, groupId, tp);
+            // Since the partition is empty, and no records have been 
consumed, the share partition startOffset will be
+            // -1. Thus, there will be no description for the share partition.
+            assertNull(sharePartitionDescription);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForSingleShareConsumer() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+            Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition in 
created and teh record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagForMultipleShareConsumers() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(groupId);
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            producer.flush();
+            shareConsumer1.subscribe(List.of(tp.topic()));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Polling share consumer 1 to make sure the share partition in 
created and the records are consumed.
+            waitedPoll(shareConsumer1, 2500L, 1);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer1.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing more records to the share partition.
+            producer.send(record);
+            // Polling share consumer 2 this time.
+            waitedPoll(shareConsumer2, 2500L, 1);
+            // Since the consumed record hasn't been acknowledged yet, the 
share partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // Acknowledge and commit the consumed records to update the share 
partition state.
+            shareConsumer2.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the all produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithReleaseAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now release the record - it should be available for redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+            // After releasing the lag should be 1, because the record is 
released for redelivery.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            // The record is now consumed again.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record to mark it as consumed.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting the record, the lag should be 0 because all the 
produced records have been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagWithRejectAcknowledgement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // The produced record is consumed.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            // Now reject the record - it should not be available for 
redelivery.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.REJECT));
+            shareConsumer.commitSync();
+            // After rejecting the lag should be 0, because the record is 
permanently rejected and offset moves forward.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3")
+        }
+    )
+    public void testSharePartitionLagOnGroupCoordinatorMovement() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId);
+             Admin adminClient = createAdminClient()) {
+            String topicName = "testTopicWithReplicas";
+            // Create a topic with replication factor 3
+            Uuid tpId = createTopic(topicName, 1, 3);
+            TopicPartition tp = new TopicPartition(topicName, 0);
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Produce first record and consume it
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure the share partition is 
created and the record is consumed.
+            waitedPoll(shareConsumer, 2500L, 1);
+            // Acknowledge and commit the consumed record to update the share 
partition state.
+            shareConsumer.commitSync();
+            // After the acknowledgement is successful, the share partition 
lag should be 0 because the only produced record has been consumed.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Producing another record to the share partition.
+            producer.send(record);
+            producer.flush();
+            // Since the new record has not been consumed yet, the share 
partition lag should be 1.
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(tpId, tp));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+            List<Integer> curGroupCoordNodeId;
+            // Find the broker which is the group coordinator for the share 
group.
+            curGroupCoordNodeId = topicPartitionLeader(adminClient, 
Topic.GROUP_METADATA_TOPIC_NAME, consumerOffsetsTp);
+            assertEquals(1, curGroupCoordNodeId.size());
+            // Shut down the coordinator broker
+            KafkaBroker broker = 
cluster.brokers().get(curGroupCoordNodeId.get(0));
+            cluster.shutdownBroker(curGroupCoordNodeId.get(0));
+            // Wait for it to be completely shutdown
+            broker.awaitShutdown();
+            // Wait for the leaders of share coordinator, group coordinator 
and topic partition to be elected, if needed, on a different broker.
+            TestUtils.waitForCondition(() -> {
+                List<Integer> newShareCoordNodeId = 
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 
shareGroupStateTp);
+                List<Integer> newGroupCoordNodeId = 
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, 
consumerOffsetsTp);
+                List<Integer> newTopicPartitionLeader = 
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+                return newShareCoordNodeId.size() == 1 && 
!Objects.equals(newShareCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+                    newGroupCoordNodeId.size() == 1 && 
!Objects.equals(newGroupCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+                    newTopicPartitionLeader.size() == 1 && 
!Objects.equals(newTopicPartitionLeader.get(0), curGroupCoordNodeId.get(0));
+            }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
elect new leaders after broker shutdown");
+            // After group coordinator shutdown, check that lag is still 1
+            verifySharePartitionLag(adminClient, groupId, tp, 1L);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),

Review Comment:
   You do need replication factor 3, but you only need one partition for each 
of these topics. Then you'd not have to fiddle around trying to calculate the 
partitions used for the internal topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to