showuon commented on code in PR #15561: URL: https://github.com/apache/kafka/pull/15561#discussion_r1533168785
########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -152,17 +160,19 @@ public void run() { consumer.seekToEnd(emptyList()); consumer.commitSync(); Review Comment: Should we reset the `retries` to 0 here? ########## examples/src/main/java/kafka/examples/Consumer.java: ########## @@ -145,6 +145,7 @@ public KafkaConsumer<Integer, String> createKafkaConsumer() { } // sets the reset offset policy in case of invalid or no offset props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); Review Comment: Why should we add this? ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { Review Comment: Since we will not always retry. Maybe rename to `maybeRetry` ? ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { + retries++; + if (retries > 0 && retries <= MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + } else if (retries > MAX_RETRIES) { + // continue: skip bad records + // in addition to logging, you may want to send these records to a DLQ for further processing + records.forEach(record -> { + Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); + consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); + consumer.commitSync(); + }); + retries = 0; + } Review Comment: I think we should also have a `else` case to print some error messages for retries < 0's case. ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { + retries++; + if (retries > 0 && retries <= MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + } else if (retries > MAX_RETRIES) { + // continue: skip bad records + // in addition to logging, you may want to send these records to a DLQ for further processing + records.forEach(record -> { + Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); + consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); + consumer.commitSync(); Review Comment: Is this necessary? I thought we'll move to next offset even if we didn't do this seek. No? ########## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ########## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) { }).sum(); } + private int retry(int retries, KafkaConsumer<Integer, String> consumer, ConsumerRecords<Integer, String> records) { Review Comment: Also, since this is an example for "general users", it'd be better we add some comments for this method above. Something like what this method is doing, why do we need this method, and what's the result exceeding max retry... etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org