This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f120427  HUDI-105 : Fix up offsets not available on leader exception 
(#650)
f120427 is described below

commit f120427607986de82f99f2ba9a2eb0ff136c2bae
Author: leiline <lamlee1...@outlook.com>
AuthorDate: Fri May 24 10:32:31 2019 +0800

    HUDI-105 : Fix up offsets not available on leader exception (#650)
    
    * Fix up offsets not available on leader exception
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
index 947f3c4..5a4b727 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
@@ -204,8 +204,9 @@ public class KafkaOffsetGen {
 
     // Determine the offset ranges to read from
     HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
     if (lastCheckpointStr.isPresent()) {
-      fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+      fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, 
topicPartitions);
     } else {
       KafkaResetOffsetStrategies autoResetValue =  
KafkaResetOffsetStrategies.valueOf(
               props.getString("auto.offset.reset", 
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -235,6 +236,26 @@ public class KafkaOffsetGen {
     return offsetRanges;
   }
 
+  // check up checkpoint offsets is valid or not, if true,  return checkpoint 
offsets,
+  // else return earliest offsets
+  private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> 
checkupValidOffsets(
+          KafkaCluster cluster,
+          Optional<String> lastCheckpointStr,
+          Set<TopicAndPartition> topicPartitions) {
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
+            CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
+            new HashMap(ScalaHelpers.toJavaMap(
+            cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+
+    boolean checkpointOffsetReseter = checkpointOffsets.entrySet()
+            .stream()
+            .anyMatch(offset -> offset.getValue().offset()
+                    < earliestOffsets.get(offset.getKey()).offset());
+    return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+  }
+
+
   public String getTopicName() {
     return topicName;
   }

Reply via email to