showuon commented on code in PR #15561:
URL: https://github.com/apache/kafka/pull/15561#discussion_r1533532443


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, 
String> consumer) {
         }).sum();
     }
 
+    private int retry(int retries, KafkaConsumer<Integer, String> consumer, 
ConsumerRecords<Integer, String> records) {
+        retries++;
+        if (retries > 0 && retries <= MAX_RETRIES) {
+            // retry: reset fetch offset
+            // the consumer fetch position needs to be restored to the 
committed offset before the transaction started
+            Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(consumer.assignment());
+            consumer.assignment().forEach(tp -> {
+                OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+                if (offsetAndMetadata != null) {
+                    consumer.seek(tp, offsetAndMetadata.offset());
+                } else {
+                    consumer.seekToBeginning(Collections.singleton(tp));
+                }
+            });
+        } else if (retries > MAX_RETRIES) {
+            // continue: skip bad records
+            // in addition to logging, you may want to send these records to a 
DLQ for further processing
+            records.forEach(record -> {
+                Utils.printErr("Skipping record after %d retries: %s", 
MAX_RETRIES, record.value());
+                consumer.seek(new TopicPartition(record.topic(), 
record.partition()), record.offset() + 1);
+                consumer.commitSync();
+            });
+            retries = 0;
+        }

Review Comment:
   Yes, I know, so if that happened, it should be an fatal error, right? 
Otherwise, we don't need to check `retries > 0` at the first `if` condition. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to