AndrewJSchofield commented on code in PR #20148: URL: https://github.com/apache/kafka/pull/20148#discussion_r2216370644
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java: ########## @@ -123,6 +125,29 @@ public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType type) throw new IllegalStateException("The record cannot be acknowledged."); } + /** + * Acknowledge a single record by its topic, partition and offset in the current batch. + * + * @param topic The topic of the record to acknowledge + * @param partition The partition of the record + * @param offset The offset of the record + * @param type The acknowledgment type which indicates whether it was processed successfully + */ + public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) { + for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) { + TopicIdPartition tip = tipBatch.getKey(); + KafkaException shareException = tipBatch.getValue().getException(); + if (tip.topic().equals(topic) && (tip.partition() == partition) && + shareException instanceof RecordDeserializationException && Review Comment: I don't like the `instanceof` and then the cast. One way around this I suppose would be to `ShareInFlightBatch` have an exception which has a known type, `ShareFetchException` I guess. Then `ShareFetchException` could have methods which extract the topic, partition and offset from the `RecordDeserialization` exception, and then you just need `shareException.offset() == offset)`. wdyt? ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() { } } + @ClusterTest + public void testExplicitOverrideAcknowledgeCorruptedMessage() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(3))) { + + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(2, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + shareConsumer.acknowledge(firstRecord); + shareConsumer.acknowledge(secondRecord); + + RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + shareConsumer.commitSync(); + + // The corrupted record was automatically released, so we can still obtain it. + rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + + // Reject this record + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); + + records = shareConsumer.poll(Duration.ZERO); + assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsNotException() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0); + assertEquals(0L, consumedRecord.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT)); + + shareConsumer.acknowledge(consumedRecord); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsParametersError() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(2))) { + + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + shareConsumer.acknowledge(firstRecord); + + final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(1, rde.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT)); + + // Reject this record + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); Review Comment: And what happens if you repeat the acknowledge call after the commit? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.SerializationException; + +public class ShareFetchException extends SerializationException { + + private final KafkaException origin; + + private final ShareFetch<?, ?> shareFetch; + + public ShareFetchException(KafkaException exception, ShareFetch<?, ?> shareFetch) { Review Comment: I suggest `ShareFetchException(ShareFetch<?, ?> shareFetch, KafkaException cause)`. If anything, the origin seems to be the `ShareFetch` to me. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java: ########## @@ -110,7 +112,7 @@ public boolean isEmpty() { * Acknowledge a single record in the current batch. * * @param record The record to acknowledge - * @param type The acknowledge type which indicates whether it was processed successfully + * @param type The acknowledgment type which indicates whether it was processed successfully */ public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType type) { Review Comment: Let's make this `type` parameter final too. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -507,6 +507,30 @@ public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) { delegate.acknowledge(record, type); } + /** + * Acknowledge delivery of a specific record by its topic, partition, and offset, indicating whether + * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()}, + * {@link #commitAsync()} or {@link #poll(Duration)} call. + * <p> + * This method provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} when + * the full record is unavailable. It is typically used for manual offset management scenarios. + * <p> + * This method can only be used if the consumer is using <b>explicit acknowledgement</b>. + * + * @param topic The topic of the record to acknowledge + * @param partition The partition of the record to acknowledge + * @param offset The offset of the record to acknowledge + * @param type The acknowledgement type which indicates whether it was processed successfully + * + * @throws IllegalStateException if the specified record is not pending acknowledgement, Review Comment: The KIP has a javadoc block which is a bit more accurate. Please take it from the KIP. (The "manual offset management scenarios" bit is strange.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org