This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fa9e37df92 [HUDI-6031] fix bug: checkpoint lost after changing cow to 
mor (#8378)
1fa9e37df92 is described below

commit 1fa9e37df92c7a38c3909ef1e71f21bad03b3d84
Author: kongwei <kong...@pku.edu.cn>
AuthorDate: Sun Apr 30 15:05:37 2023 +0800

    [HUDI-6031] fix bug: checkpoint lost after changing cow to mor (#8378)
    
    Co-authored-by: wei.kong <wei.k...@shopee.com>
---
 .../testsuite/HoodieDeltaStreamerWrapper.java      |  2 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 48 ++++++++++---------
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 54 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 22 deletions(-)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 5ff91fa5209..632bbecf10d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -78,7 +78,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
   public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception {
     DeltaSync service = getDeltaSync();
     service.refreshTimeline();
-    return service.readFromSource(service.getCommitTimelineOpt());
+    return service.readFromSource(service.getCommitsTimelineOpt());
   }
 
   public DeltaSync getDeltaSync() {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e2efe1a4e35..c59510f3676 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -227,11 +227,11 @@ public class DeltaSync implements Serializable, Closeable 
{
   private transient Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient;
 
   /**
-   * Timeline with completed commits.
+   * Timeline with completed commits, including both .commit and .deltacommit.
    */
-  private transient Option<HoodieTimeline> commitTimelineOpt;
+  private transient Option<HoodieTimeline> commitsTimelineOpt;
 
-  // all commits timeline
+  // all commits timeline, including all (commits, delta commits, compaction, 
clean, savepoint, rollback, replace commits, index)
   private transient Option<HoodieTimeline> allCommitsTimelineOpt;
 
   /**
@@ -320,11 +320,9 @@ public class DeltaSync implements Serializable, Closeable {
             .build();
         switch (meta.getTableType()) {
           case COPY_ON_WRITE:
-            this.commitTimelineOpt = 
Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
-            this.allCommitsTimelineOpt = 
Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
-            break;
           case MERGE_ON_READ:
-            this.commitTimelineOpt = 
Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
+            // we can use getCommitsTimeline for both COW and MOR here, 
because for COW there is no deltacommit
+            this.commitsTimelineOpt = 
Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
             this.allCommitsTimelineOpt = 
Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
             break;
           default:
@@ -361,7 +359,7 @@ public class DeltaSync implements Serializable, Closeable {
   }
 
   private void initializeEmptyTable() throws IOException {
-    this.commitTimelineOpt = Option.empty();
+    this.commitsTimelineOpt = Option.empty();
     this.allCommitsTimelineOpt = Option.empty();
     String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
     HoodieTableMetaClient.withPropertyBuilder()
@@ -398,7 +396,7 @@ public class DeltaSync implements Serializable, Closeable {
     // Refresh Timeline
     refreshTimeline();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(commitTimelineOpt);
+    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
 
     if (null != srcRecordsWithCkpt) {
       // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
@@ -448,16 +446,16 @@ public class DeltaSync implements Serializable, Closeable 
{
   /**
    * Read from Upstream Source and apply transformation if needed.
    *
-   * @param commitTimelineOpt Timeline with completed commits
+   * @param commitsTimelineOpt Timeline with completed commits, including 
.commit and .deltacommit
    * @return Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> Input 
data read from upstream source, consists
    * of schemaProvider, checkpointStr and hoodieRecord
    * @throws Exception in case of any Exception
    */
-  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
+  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(Option<HoodieTimeline> commitsTimelineOpt) throws IOException {
     // Retrieve the previous round checkpoints, if any
     Option<String> resumeCheckpointStr = Option.empty();
-    if (commitTimelineOpt.isPresent()) {
-      resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt);
+    if (commitsTimelineOpt.isPresent()) {
+      resumeCheckpointStr = getCheckpointToResume(commitsTimelineOpt);
     } else {
       // initialize the table for the first time.
       String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
@@ -650,16 +648,24 @@ public class DeltaSync implements Serializable, Closeable 
{
 
   /**
    * Process previous commit metadata and checkpoint configs set by user to 
determine the checkpoint to resume from.
-   * @param commitTimelineOpt commit timeline of interest.
+   *
+   * @param commitsTimelineOpt commits timeline of interest, including .commit 
and .deltacommit.
    * @return the checkpoint to resume from if applicable.
    * @throws IOException
    */
-  private Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitTimelineOpt) throws IOException {
+  private Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitsTimelineOpt) throws IOException {
     Option<String> resumeCheckpointStr = Option.empty();
-    Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
+    // try get checkpoint from commits(including commit and deltacommit)
+    // in COW migrating to MOR case, the first batch of the deltastreamer will 
lost the checkpoint from COW table, cause the dataloss
+    HoodieTimeline deltaCommitTimeline = 
commitsTimelineOpt.get().filter(instant -> 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+    // has deltacommit means this is a MOR table, we should get .deltacommit 
as before
+    if (!deltaCommitTimeline.empty()) {
+      commitsTimelineOpt = Option.of(deltaCommitTimeline);
+    }
+    Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant();
     if (lastCommit.isPresent()) {
       // if previous commit metadata did not have the checkpoint key, try 
traversing previous commits until we find one.
-      Option<HoodieCommitMetadata> commitMetadataOption = 
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
+      Option<HoodieCommitMetadata> commitMetadataOption = 
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get());
       if (commitMetadataOption.isPresent()) {
         HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
         LOG.debug("Checkpoint reset from metadata: " + 
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
@@ -675,7 +681,7 @@ public class DeltaSync implements Serializable, Closeable {
           throw new HoodieDeltaStreamerException(
               "Unable to find previous checkpoint. Please double check if this 
table "
                   + "was indeed built via delta streamer. Last Commit :" + 
lastCommit + ", Instants :"
-                  + commitTimelineOpt.get().getInstants() + ", CommitMetadata="
+                  + commitsTimelineOpt.get().getInstants() + ", 
CommitMetadata="
                   + commitMetadata.toJsonString());
         }
         // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
@@ -807,7 +813,7 @@ public class DeltaSync implements Serializable, Closeable {
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
       if (errorTableWriter.isPresent()) {
         // Commit the error events triggered so far to the error table
-        Option<String> commitedInstantTime = 
getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        Option<String> commitedInstantTime = 
getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
         boolean errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
         if (!errorTableSuccess) {
           switch (errorWriteFailureStrategy) {
@@ -1143,8 +1149,8 @@ public class DeltaSync implements Serializable, Closeable 
{
     return cfg;
   }
 
-  public Option<HoodieTimeline> getCommitTimelineOpt() {
-    return commitTimelineOpt;
+  public Option<HoodieTimeline> getCommitsTimelineOpt() {
+    return commitsTimelineOpt;
   }
 
   public HoodieIngestionMetrics getMetrics() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d5d8dca34d6..82b7589448e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -143,6 +143,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
 import static 
org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
@@ -2600,6 +2601,59 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " 
should exist");
   }
 
+  @Test
+  public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
+    String tableBasePath = basePath + 
"/test_resume_checkpoint_after_changing_cow_to_mor";
+    // default table type is COW
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+    // change cow to mor
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+            .setConf(new Configuration(fs.getConf()))
+            .setBasePath(cfg.targetBasePath)
+            .setLoadActiveTimelineOnLoad(false)
+            .build();
+    Properties hoodieProps = new Properties();
+    hoodieProps.load(fs.open(new Path(cfg.targetBasePath + 
"/.hoodie/hoodie.properties")));
+    LOG.info("old props: {}", hoodieProps);
+    hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
+    LOG.info("new props: {}", hoodieProps);
+    Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
+    HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
+
+    // continue deltastreamer
+    cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
+    cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
+    TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    // currently there should be 1 deltacommits now
+    TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
+
+    // test the table type is already mor
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
+    // total records should be 1900 now
+    TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+    counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    // currently there should be 2 deltacommits now
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+
+    // clean up
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+  }
+
   class TestDeltaSync extends DeltaSync {
 
     public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession 
sparkSession, SchemaProvider schemaProvider, TypedProperties props,

Reply via email to