This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1fa9e37df92 [HUDI-6031] fix bug: checkpoint lost after changing cow to mor (#8378) 1fa9e37df92 is described below commit 1fa9e37df92c7a38c3909ef1e71f21bad03b3d84 Author: kongwei <kong...@pku.edu.cn> AuthorDate: Sun Apr 30 15:05:37 2023 +0800 [HUDI-6031] fix bug: checkpoint lost after changing cow to mor (#8378) Co-authored-by: wei.kong <wei.k...@shopee.com> --- .../testsuite/HoodieDeltaStreamerWrapper.java | 2 +- .../hudi/utilities/deltastreamer/DeltaSync.java | 48 ++++++++++--------- .../deltastreamer/TestHoodieDeltaStreamer.java | 54 ++++++++++++++++++++++ 3 files changed, 82 insertions(+), 22 deletions(-) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index 5ff91fa5209..632bbecf10d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -78,7 +78,7 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception { DeltaSync service = getDeltaSync(); service.refreshTimeline(); - return service.readFromSource(service.getCommitTimelineOpt()); + return service.readFromSource(service.getCommitsTimelineOpt()); } public DeltaSync getDeltaSync() { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index e2efe1a4e35..c59510f3676 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -227,11 +227,11 @@ public class DeltaSync implements Serializable, Closeable { private transient Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient; /** - * Timeline with completed commits. + * Timeline with completed commits, including both .commit and .deltacommit. */ - private transient Option<HoodieTimeline> commitTimelineOpt; + private transient Option<HoodieTimeline> commitsTimelineOpt; - // all commits timeline + // all commits timeline, including all (commits, delta commits, compaction, clean, savepoint, rollback, replace commits, index) private transient Option<HoodieTimeline> allCommitsTimelineOpt; /** @@ -320,11 +320,9 @@ public class DeltaSync implements Serializable, Closeable { .build(); switch (meta.getTableType()) { case COPY_ON_WRITE: - this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); - this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); - break; case MERGE_ON_READ: - this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); + // we can use getCommitsTimeline for both COW and MOR here, because for COW there is no deltacommit + this.commitsTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); break; default: @@ -361,7 +359,7 @@ public class DeltaSync implements Serializable, Closeable { } private void initializeEmptyTable() throws IOException { - this.commitTimelineOpt = Option.empty(); + this.commitsTimelineOpt = Option.empty(); this.allCommitsTimelineOpt = Option.empty(); String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props); HoodieTableMetaClient.withPropertyBuilder() @@ -398,7 +396,7 @@ public class DeltaSync implements Serializable, Closeable { // Refresh Timeline refreshTimeline(); - Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitTimelineOpt); + Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitsTimelineOpt); if (null != srcRecordsWithCkpt) { // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start @@ -448,16 +446,16 @@ public class DeltaSync implements Serializable, Closeable { /** * Read from Upstream Source and apply transformation if needed. * - * @param commitTimelineOpt Timeline with completed commits + * @param commitsTimelineOpt Timeline with completed commits, including .commit and .deltacommit * @return Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> Input data read from upstream source, consists * of schemaProvider, checkpointStr and hoodieRecord * @throws Exception in case of any Exception */ - public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException { + public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitsTimelineOpt) throws IOException { // Retrieve the previous round checkpoints, if any Option<String> resumeCheckpointStr = Option.empty(); - if (commitTimelineOpt.isPresent()) { - resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt); + if (commitsTimelineOpt.isPresent()) { + resumeCheckpointStr = getCheckpointToResume(commitsTimelineOpt); } else { // initialize the table for the first time. String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props); @@ -650,16 +648,24 @@ public class DeltaSync implements Serializable, Closeable { /** * Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from. - * @param commitTimelineOpt commit timeline of interest. + * + * @param commitsTimelineOpt commits timeline of interest, including .commit and .deltacommit. * @return the checkpoint to resume from if applicable. * @throws IOException */ - private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException { + private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitsTimelineOpt) throws IOException { Option<String> resumeCheckpointStr = Option.empty(); - Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant(); + // try get checkpoint from commits(including commit and deltacommit) + // in COW migrating to MOR case, the first batch of the deltastreamer will lost the checkpoint from COW table, cause the dataloss + HoodieTimeline deltaCommitTimeline = commitsTimelineOpt.get().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + // has deltacommit means this is a MOR table, we should get .deltacommit as before + if (!deltaCommitTimeline.empty()) { + commitsTimelineOpt = Option.of(deltaCommitTimeline); + } + Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant(); if (lastCommit.isPresent()) { // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one. - Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()); + Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get()); if (commitMetadataOption.isPresent()) { HoodieCommitMetadata commitMetadata = commitMetadataOption.get(); LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)); @@ -675,7 +681,7 @@ public class DeltaSync implements Serializable, Closeable { throw new HoodieDeltaStreamerException( "Unable to find previous checkpoint. Please double check if this table " + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" - + commitTimelineOpt.get().getInstants() + ", CommitMetadata=" + + commitsTimelineOpt.get().getInstants() + ", CommitMetadata=" + commitMetadata.toJsonString()); } // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. @@ -807,7 +813,7 @@ public class DeltaSync implements Serializable, Closeable { String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); if (errorTableWriter.isPresent()) { // Commit the error events triggered so far to the error table - Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt); + Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt); boolean errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime); if (!errorTableSuccess) { switch (errorWriteFailureStrategy) { @@ -1143,8 +1149,8 @@ public class DeltaSync implements Serializable, Closeable { return cfg; } - public Option<HoodieTimeline> getCommitTimelineOpt() { - return commitTimelineOpt; + public Option<HoodieTimeline> getCommitsTimelineOpt() { + return commitsTimelineOpt; } public HoodieIngestionMetrics getMetrics() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index d5d8dca34d6..82b7589448e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -143,6 +143,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE; import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; @@ -2600,6 +2601,59 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist"); } + @Test + public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception { + String tableBasePath = basePath + "/test_resume_checkpoint_after_changing_cow_to_mor"; + // default table type is COW + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + + // change cow to mor + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(new Configuration(fs.getConf())) + .setBasePath(cfg.targetBasePath) + .setLoadActiveTimelineOnLoad(false) + .build(); + Properties hoodieProps = new Properties(); + hoodieProps.load(fs.open(new Path(cfg.targetBasePath + "/.hoodie/hoodie.properties"))); + LOG.info("old props: {}", hoodieProps); + hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name()); + LOG.info("new props: {}", hoodieProps); + Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME); + HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps); + + // continue deltastreamer + cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + new HoodieDeltaStreamer(cfg, jsc).sync(); + // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. + TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + // currently there should be 1 deltacommits now + TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs); + + // test the table type is already mor + new HoodieDeltaStreamer(cfg, jsc).sync(); + // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. + // total records should be 1900 now + TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3); + counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + // currently there should be 2 deltacommits now + TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs); + + // clean up + UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); + } + class TestDeltaSync extends DeltaSync { public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,