showuon commented on code in PR #15561:
URL: https://github.com/apache/kafka/pull/15561#discussion_r1533168785


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -152,17 +160,19 @@ public void run() {
                     consumer.seekToEnd(emptyList());
                     consumer.commitSync();

Review Comment:
   Should we reset the `retries` to 0 here?



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -145,6 +145,7 @@ public KafkaConsumer<Integer, String> createKafkaConsumer() 
{
         }
         // sets the reset offset policy in case of invalid or no offset
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);

Review Comment:
   Why should we add this?



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, 
String> consumer) {
         }).sum();
     }
 
+    private int retry(int retries, KafkaConsumer<Integer, String> consumer, 
ConsumerRecords<Integer, String> records) {

Review Comment:
   Since we will not always retry. Maybe rename to `maybeRetry` ? 



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, 
String> consumer) {
         }).sum();
     }
 
+    private int retry(int retries, KafkaConsumer<Integer, String> consumer, 
ConsumerRecords<Integer, String> records) {
+        retries++;
+        if (retries > 0 && retries <= MAX_RETRIES) {
+            // retry: reset fetch offset
+            // the consumer fetch position needs to be restored to the 
committed offset before the transaction started
+            Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(consumer.assignment());
+            consumer.assignment().forEach(tp -> {
+                OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+                if (offsetAndMetadata != null) {
+                    consumer.seek(tp, offsetAndMetadata.offset());
+                } else {
+                    consumer.seekToBeginning(Collections.singleton(tp));
+                }
+            });
+        } else if (retries > MAX_RETRIES) {
+            // continue: skip bad records
+            // in addition to logging, you may want to send these records to a 
DLQ for further processing
+            records.forEach(record -> {
+                Utils.printErr("Skipping record after %d retries: %s", 
MAX_RETRIES, record.value());
+                consumer.seek(new TopicPartition(record.topic(), 
record.partition()), record.offset() + 1);
+                consumer.commitSync();
+            });
+            retries = 0;
+        }

Review Comment:
   I think we should also have a `else` case to print some error messages for 
retries < 0's case.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, 
String> consumer) {
         }).sum();
     }
 
+    private int retry(int retries, KafkaConsumer<Integer, String> consumer, 
ConsumerRecords<Integer, String> records) {
+        retries++;
+        if (retries > 0 && retries <= MAX_RETRIES) {
+            // retry: reset fetch offset
+            // the consumer fetch position needs to be restored to the 
committed offset before the transaction started
+            Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(consumer.assignment());
+            consumer.assignment().forEach(tp -> {
+                OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+                if (offsetAndMetadata != null) {
+                    consumer.seek(tp, offsetAndMetadata.offset());
+                } else {
+                    consumer.seekToBeginning(Collections.singleton(tp));
+                }
+            });
+        } else if (retries > MAX_RETRIES) {
+            // continue: skip bad records
+            // in addition to logging, you may want to send these records to a 
DLQ for further processing
+            records.forEach(record -> {
+                Utils.printErr("Skipping record after %d retries: %s", 
MAX_RETRIES, record.value());
+                consumer.seek(new TopicPartition(record.topic(), 
record.partition()), record.offset() + 1);
+                consumer.commitSync();

Review Comment:
   Is this necessary? I thought we'll move to next offset even if we didn't do 
this seek. No?



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer<Integer, 
String> consumer) {
         }).sum();
     }
 
+    private int retry(int retries, KafkaConsumer<Integer, String> consumer, 
ConsumerRecords<Integer, String> records) {

Review Comment:
   Also, since this is an example for "general users", it'd be better we add 
some comments for this method above. Something like what this method is doing, 
why do we need this method, and what's the result exceeding max retry... etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to