AndrewJSchofield commented on code in PR #20900:
URL: https://github.com/apache/kafka/pull/20900#discussion_r2536924233


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2998,6 +3008,119 @@ public void testRenewAcknowledgementOnCommitSync() {
                 shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
             }
         }
+        verifyYammerMetricCount("ackType=Renew", 5);
+    }
+
+    @ClusterTest
+    public void testRenewAcknowledgementInvalidStateRecord() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message 
".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+
+            for (ConsumerRecord<byte[], byte[]> rec : records) {
+                shareConsumer.acknowledge(rec, AcknowledgeType.REJECT);
+                shareConsumer.commitSync();
+                assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rec, AcknowledgeType.RENEW));
+            }
+        }
+        verifyYammerMetricCount("ackType=Renew", 0);
+    }
+
+    @ClusterTest(
+        brokers = 1,
+        serverProperties = {
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "12000"),
+            @ClusterConfigProperty(key = 
"group.share.min.record.lock.duration.ms", value = "12000"),
+        }
+    )
+    public void testRenewAcknowledgementNoResultInPoll() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) ->
+                offsetsByTopicPartition.forEach((tip, offsets) -> 
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+            for (int i = 0; i < 10; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            assertEquals(10, records.count());
+
+            int count = 0;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                if (count % 2 == 0) {
+                    shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+                } else {
+                    shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+                }
+                count++;
+            }
+
+            // 5 more records (total 15 produced).
+            for (int i = 10; i < 15; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+
+            // Get the rest of all 5 records.
+            records = waitedPoll(shareConsumer, 11500L, 0);  // This will send 
the acks but not return next 5 records (10-15)
+            assertEquals(10, acknowledgementsCommitted.get());
+            assertEquals(0, records.count());
+            verifyYammerMetricCount("ackType=Renew", 5);
+
+            // Renewal duration passed, now records will be back.
+            records = waitedPoll(shareConsumer, 2500L, 5);  // Renewed records 
as well as 10-15 records.
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+
+            shareConsumer.commitSync();
+
+            records = waitedPoll(shareConsumer, 2500L, 5);  // 10-15 records.
+            assertEquals(5, records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+            }
+
+            shareConsumer.commitSync();
+
+            // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted + 
5 fresh accepted (10-15)
+            assertEquals(20, acknowledgementsCommitted.get());
+        }
+        verifyYammerMetricCount("ackType=Renew", 5);
+    }
+
+    private void verifyYammerMetricCount(String filterString, int count) {
+        com.yammer.metrics.core.Metric renewAck = 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()

Review Comment:
   nit: This isn't really a `renewAck` because the filter string is an 
argument. However, that's such a tiny nit that we can fix it next time through 
this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to