AndrewJSchofield commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2216370644


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -123,6 +125,29 @@ public void acknowledge(final ConsumerRecord<K, V> record, 
AcknowledgeType type)
         throw new IllegalStateException("The record cannot be acknowledged.");
     }
 
+    /**
+     * Acknowledge a single record by its topic, partition and offset in the 
current batch.
+     *
+     * @param topic     The topic of the record to acknowledge
+     * @param partition The partition of the record
+     * @param offset    The offset of the record
+     * @param type      The acknowledgment type which indicates whether it was 
processed successfully
+     */
+    public void acknowledge(final String topic, final int partition, final 
long offset, final AcknowledgeType type) {
+        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : 
batches.entrySet()) {
+            TopicIdPartition tip = tipBatch.getKey();
+            KafkaException shareException = tipBatch.getValue().getException();
+            if (tip.topic().equals(topic) && (tip.partition() == partition) &&
+                shareException instanceof RecordDeserializationException &&

Review Comment:
   I don't like the `instanceof` and then the cast. One way around this I 
suppose would be to  `ShareInFlightBatch` have an exception which has a known 
type, `ShareFetchException` I guess. Then `ShareFetchException` could have 
methods which extract the topic, partition and offset from the 
`RecordDeserialization` exception, and then you just need 
`shareException.offset() == offset)`. wdyt?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
         }
     }
 
+    @ClusterTest
+    public void testExplicitOverrideAcknowledgeCorruptedMessage() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT),
+                null,
+                mockErrorDeserializer(3))) {
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.send(record2);
+            producer.send(record3);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(2, records.count());
+            Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
records.iterator();
+
+            ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            assertEquals(0L, firstRecord.offset());
+            assertEquals(1L, secondRecord.offset());
+            shareConsumer.acknowledge(firstRecord);
+            shareConsumer.acknowledge(secondRecord);
+
+            RecordDeserializationException rde = 
assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(2, rde.offset());
+            shareConsumer.commitSync();
+
+            // The corrupted record was automatically released, so we can 
still obtain it.
+            rde = assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(2, rde.offset());
+
+            // Reject this record
+            shareConsumer.acknowledge(rde.topicPartition().topic(), 
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+            shareConsumer.commitSync();
+
+            records = shareConsumer.poll(Duration.ZERO);
+            assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void testExplicitAcknowledgeOffsetThrowsNotException() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))) {
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(1, records.count());
+            ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
+            assertEquals(0L, consumedRecord.offset());
+
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), 
AcknowledgeType.ACCEPT));
+
+            shareConsumer.acknowledge(consumedRecord);
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void testExplicitAcknowledgeOffsetThrowsParametersError() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT),
+                null,
+                mockErrorDeserializer(2))) {
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.send(record2);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(1, records.count());
+            Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
records.iterator();
+
+            ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+            assertEquals(0L, firstRecord.offset());
+            shareConsumer.acknowledge(firstRecord);
+
+            final RecordDeserializationException rde = 
assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(1, rde.offset());
+
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge("foo", rde.topicPartition().partition(), 
rde.offset(), AcknowledgeType.REJECT));
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), 
AcknowledgeType.REJECT));
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, 
AcknowledgeType.REJECT));
+
+            // Reject this record
+            shareConsumer.acknowledge(rde.topicPartition().topic(), 
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+            shareConsumer.commitSync();

Review Comment:
   And what happens if you repeat the acknowledge call after the commit?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.SerializationException;
+
+public class ShareFetchException extends SerializationException {
+
+    private final KafkaException origin;
+
+    private final ShareFetch<?, ?> shareFetch;
+
+    public ShareFetchException(KafkaException exception, ShareFetch<?, ?> 
shareFetch) {

Review Comment:
   I suggest `ShareFetchException(ShareFetch<?, ?> shareFetch, KafkaException 
cause)`. If anything, the origin seems to be the `ShareFetch` to me.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -110,7 +112,7 @@ public boolean isEmpty() {
      * Acknowledge a single record in the current batch.
      *
      * @param record The record to acknowledge
-     * @param type The acknowledge type which indicates whether it was 
processed successfully
+     * @param type The acknowledgment type which indicates whether it was 
processed successfully
      */
     public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType 
type) {

Review Comment:
   Let's make this `type` parameter final too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -507,6 +507,30 @@ public void acknowledge(ConsumerRecord<K, V> record, 
AcknowledgeType type) {
         delegate.acknowledge(record, type);
     }
 
+    /**
+     * Acknowledge delivery of a specific record by its topic, partition, and 
offset, indicating whether
+     * it was processed successfully. The acknowledgement is committed on the 
next {@link #commitSync()},
+     * {@link #commitAsync()} or {@link #poll(Duration)} call.
+     * <p>
+     * This method provides an alternative to {@link 
#acknowledge(ConsumerRecord, AcknowledgeType)} when
+     * the full record is unavailable. It is typically used for manual offset 
management scenarios.
+     * <p>
+     * This method can only be used if the consumer is using <b>explicit 
acknowledgement</b>.
+     *
+     * @param topic     The topic of the record to acknowledge
+     * @param partition The partition of the record to acknowledge
+     * @param offset    The offset of the record to acknowledge
+     * @param type      The acknowledgement type which indicates whether it 
was processed successfully
+     *
+     * @throws IllegalStateException if the specified record is not pending 
acknowledgement,

Review Comment:
   The KIP has a javadoc block which is a bit more accurate. Please take it 
from the KIP. (The "manual offset management scenarios" bit is strange.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to