[GitHub] [incubator-hudi] garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly
garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly URL: https://github.com/apache/incubator-hudi/pull/1377#discussion_r391350401 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -180,7 +180,7 @@ public KafkaOffsetGen(TypedProperties props) { .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - if (lastCheckpointStr.isPresent()) { + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { Review comment: In this case I think the user could use `cfg.checkpoint = xxx` to reset the checkpoint. It would be concerning for me if the deltastreamer automatically reset the checkpoint for me and I didn't aware of it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly
garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly URL: https://github.com/apache/incubator-hudi/pull/1377#discussion_r391344867 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -180,7 +180,7 @@ public KafkaOffsetGen(TypedProperties props) { .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - if (lastCheckpointStr.isPresent()) { + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { Review comment: I agree with @bvaradar here. I tried ```java else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null && !commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) { resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); } ``` The unit tests were passed. If you mean when the checkpoint was empty, the application was throwing an Exception. I think it should the desired behavior, WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly
garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly URL: https://github.com/apache/incubator-hudi/pull/1377#discussion_r389089300 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -180,7 +180,7 @@ public KafkaOffsetGen(TypedProperties props) { .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - if (lastCheckpointStr.isPresent()) { + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { Review comment: Right. As you mentioned it is still possible that some wrong user behaviors might lead to an empty checkpoint. From a user perspective, I'd say if there is an empty checkpoint in the last commit, I will prefer to let the job fail other than automatically reset the checkpoint. Throw an exception if the checkpoint is empty would make more sense to me and let the user decide whether they wanna reset or not. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly
garyli1019 commented on a change in pull request #1377: [HUDI-663] Fix HoodieDeltaStreamer offset not handled correctly URL: https://github.com/apache/incubator-hudi/pull/1377#discussion_r388705608 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java ## @@ -180,7 +180,7 @@ public KafkaOffsetGen(TypedProperties props) { .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - if (lastCheckpointStr.isPresent()) { + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { Review comment: I think this may potentially hide some concerning errors. e.g. The delta streamer is consuming Kafka source, but a hidden bug happens and stored an empty checkpoint. The next run will just ignore the empty checkpoint and reset to the `LATEST`. Then there will be data loss This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services