zheguang commented on code in PR #20515:
URL: https://github.com/apache/kafka/pull/20515#discussion_r2390031494


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -255,4 +297,37 @@ private static int byteSize(byte[] bytes) {
     private boolean isUncommitted(Long offset) {
         return offset == null || offset < 0;
     }
+
+    private void handleTopicResetIfAny() {
+        if (!autoRecoverOnReset) return;
+        // Describe all topics currently subscribed
+        Set<String> topics = consumer.subscription();
+        if (topics.isEmpty()) return;
+        try {
+            DescribeTopicsResult res = sourceAdmin.describeTopics(topics);
+            Map<String, TopicDescription> desc = res.all().get();

Review Comment:
   `DescribeTopicResult#all` has private access.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -85,6 +103,15 @@ public void start(Map<String, String> props) {
         Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
         initializeConsumer(taskTopicPartitions);
 
+        // Fault tolerance configuration
+        this.failOnTruncation = 
Boolean.parseBoolean(props.getOrDefault("mirrorsource.fail.on.truncation", 
"true"));
+        this.autoRecoverOnReset = 
Boolean.parseBoolean(props.getOrDefault("mirrorsource.auto.recover.on.reset", 
"true"));
+        this.topicResetRetryMs = 
Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000"));
+
+        // Build AdminClient for source cluster (same configs as source 
consumer)
+        Map<String, Object> adminProps = new 
HashMap<>(config.sourceConsumerConfig("replication-consumer"));

Review Comment:
   `HashMap` requires import.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -255,4 +297,37 @@ private static int byteSize(byte[] bytes) {
     private boolean isUncommitted(Long offset) {
         return offset == null || offset < 0;
     }
+
+    private void handleTopicResetIfAny() {
+        if (!autoRecoverOnReset) return;
+        // Describe all topics currently subscribed
+        Set<String> topics = consumer.subscription();
+        if (topics.isEmpty()) return;
+        try {
+            DescribeTopicsResult res = sourceAdmin.describeTopics(topics);
+            Map<String, TopicDescription> desc = res.all().get();
+            for (Map.Entry<String, TopicDescription> e : desc.entrySet()) {
+                String t = e.getKey();
+                Uuid current = e.getValue().topicId();
+                Uuid previous = topicIds.putIfAbsent(t, current);
+                if (previous != null && !previous.equals(current)) {
+                    FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} 
newId={}). Seeking to beginning.", t, previous, current);
+                    // Seek to beginning for all assigned partitions of this 
topic
+                    Set<TopicPartition> toSeek = consumer.assignment().stream()
+                        .filter(tp -> tp.topic().equals(t))
+                        .collect(Collectors.toSet());
+                    if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek);
+                }
+            }
+        } catch (InterruptedException ie) {

Review Comment:
   Unclear who throws in the try block.



##########
mm2-fault-tolerance.patch:
##########
@@ -0,0 +1,120 @@
+--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java

Review Comment:
   This file shouldn't be checked in.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -148,6 +176,20 @@ public List<SourceRecord> poll() {
                 log.trace("Polled {} records from {}.", sourceRecords.size(), 
records.partitions());
                 return sourceRecords;
             }
+        } catch (OffsetOutOfRangeException oore) {
+            if (!failOnTruncation) throw oore;
+            // Fail-fast with precise diagnostics
+            Map<TopicPartition, Long> earliest = 
consumer.beginningOffsets(consumer.assignment());
+            String msg = "TRUNCATION_DETECTED: Source offsets are out of range 
(likely retention purge). " +
+                         "Earliest per partition=" + earliest + ", 
assignment=" + consumer.assignment();
+            FT_LOG.error(msg, oore);
+            throw new ConnectException(msg, oore);
+        } catch (UnknownTopicOrPartitionException utpe) {
+            if (!autoRecoverOnReset) throw utpe;
+            FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and 
resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs);
+            sleep(topicResetRetryMs);
+            handleTopicResetIfAny();
+            return Collections.emptyList();

Review Comment:
   `Collections` requires import.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -255,4 +297,37 @@ private static int byteSize(byte[] bytes) {
     private boolean isUncommitted(Long offset) {
         return offset == null || offset < 0;
     }
+
+    private void handleTopicResetIfAny() {
+        if (!autoRecoverOnReset) return;
+        // Describe all topics currently subscribed
+        Set<String> topics = consumer.subscription();
+        if (topics.isEmpty()) return;
+        try {
+            DescribeTopicsResult res = sourceAdmin.describeTopics(topics);
+            Map<String, TopicDescription> desc = res.all().get();
+            for (Map.Entry<String, TopicDescription> e : desc.entrySet()) {
+                String t = e.getKey();
+                Uuid current = e.getValue().topicId();
+                Uuid previous = topicIds.putIfAbsent(t, current);
+                if (previous != null && !previous.equals(current)) {
+                    FT_LOG.warn("RESET_DETECTED: Topic {} recreated (oldId={} 
newId={}). Seeking to beginning.", t, previous, current);
+                    // Seek to beginning for all assigned partitions of this 
topic
+                    Set<TopicPartition> toSeek = consumer.assignment().stream()
+                        .filter(tp -> tp.topic().equals(t))
+                        .collect(Collectors.toSet());
+                    if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek);
+                }
+            }
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException ee) {

Review Comment:
   Ditto.  Unclear who throws in the try block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to