This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new c23da69 [HUDI-169] Speed up rolling back of instants (#968) c23da69 is described below commit c23da694ccbbf1f0ebcc5ed770dd56c74c1e06d7 Author: Balaji Varadarajan <varad...@uber.com> AuthorDate: Thu Oct 24 19:34:00 2019 -0700 [HUDI-169] Speed up rolling back of instants (#968) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 13 + .../apache/hudi/table/HoodieCopyOnWriteTable.java | 70 ++--- .../apache/hudi/table/HoodieMergeOnReadTable.java | 328 +++++++++------------ .../org/apache/hudi/table/RollbackExecutor.java | 233 +++++++++++++++ .../org/apache/hudi/table/RollbackRequest.java | 109 +++++++ 5 files changed, 506 insertions(+), 247 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3597556..16b223c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; + private static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; + private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024); private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; @@ -141,6 +143,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM)); } + public int getRollbackParallelism() { + return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM)); + } + + public int getWriteBufferLimitBytes() { return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES)); } @@ -562,6 +569,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withRollbackParallelism(int rollbackParallelism) { + props.setProperty(ROLLBACK_PARALLELISM, String.valueOf(rollbackParallelism)); + return this; + } + public Builder withWriteBufferLimitBytes(int writeBufferLimit) { props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit)); return this; @@ -651,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP, DEFAULT_COMBINE_BEFORE_INSERT); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP, diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 6644f14..e18578d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -32,10 +32,8 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.WriteStatus; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCleanStat; @@ -74,7 +72,6 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; @@ -294,45 +291,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } } - /** - * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits - */ - protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath, - PathFilter filter) throws IOException { - logger.info("Cleaning path " + partitionPath); - FileSystem fs = getMetaClient().getFs(); - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - logger.info("Delete file " + file.getPath() + "\t" + success); - } - return results; - } - - /** - * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits - */ - protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit, - String partitionPath) throws IOException { - logger.info("Cleaning path " + partitionPath); - FileSystem fs = getMetaClient().getFs(); - PathFilter filter = (path) -> { - if (path.toString().contains(".parquet")) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } - return false; - }; - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - logger.info("Delete file " + file.getPath() + "\t" + success); - } - return results; - } - @Override public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants) throws IOException { @@ -342,30 +300,38 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); // Atomically unpublish the commits if (!inflights.contains(commit)) { + logger.info("Unpublishing " + commit); activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit)); } - logger.info("Unpublished " + commit); + + HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit); + Long startTime = System.currentTimeMillis(); // delete all the data files for this commit logger.info("Clean out all parquet files generated for commit: " + commit); + List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback); + + //TODO: We need to persist this as rollback workload and use it in case of partial failures List<HoodieRollbackStat> stats = - jsc.parallelize(FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> { - // Scan all partitions files with this commit time - final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>(); - deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath); - return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus).build(); - }).collect(); + new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); // Delete Inflight instant if enabled deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit)); + logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return stats; } + private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) + throws IOException { + return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { + return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); + }).collect(Collectors.toList()); + } + /** * Delete Inflight instant if enabled - * + * * @param deleteInstant Enable Deletion of Inflight instant * @param activeTimeline Hoodie active timeline * @param instantToBeDeleted Instant to be deleted diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index f9b4141..8dcb3bf 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -19,22 +19,17 @@ package org.apache.hudi.table; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.WriteStatus; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieRollbackStat; @@ -47,11 +42,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.SyncableFileSystemView; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.FSUtils; @@ -59,10 +49,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.func.MergeOnReadLazyInsertIterable; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.HoodieAppendHandle; import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor; import org.apache.log4j.LogManager; @@ -70,7 +58,6 @@ import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; /** * Implementation of a more real-time read-optimized Hoodie Table where @@ -180,7 +167,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi @Override public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants) throws IOException { - // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental // feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback // (commitToRollback). @@ -198,140 +184,121 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi } logger.info("Unpublished " + commit); Long startTime = System.currentTimeMillis(); + List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback); + //TODO: We need to persist this as rollback workload and use it in case of partial failures List<HoodieRollbackStat> allRollbackStats = - jsc.parallelize(FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> { - HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); - HoodieRollbackStat hoodieRollbackStats = null; - // Need to put the path filter here since Filter is not serializable - // PathFilter to get all parquet files and log files that need to be deleted - PathFilter filter = (path) -> { - if (path.toString().contains(".parquet")) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } else if (path.toString().contains(".log")) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commit.equals(fileCommitTime); - } - return false; - }; - - final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>(); - - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - try { - // Rollback of a commit should delete the newly created parquet files along with any log - // files created with this as baseCommit. This is required to support multi-rollbacks in a MOR - // table. - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus).build(); - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - case HoodieTimeline.COMPACTION_ACTION: - try { - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of - // the - // succeeding deltacommit. - boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants() - .findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is - // scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet - // files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath); - hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus).build(); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus).build(); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. - // In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime - // and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no - // entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes( - metaClient.getCommitTimeline().getInstantDetails(new HoodieInstant(true, - instantToRollback.getAction(), instantToRollback.getTimestamp())).get(), - HoodieCommitMetadata.class); - - // read commit file and (either append delete blocks or delete file) - Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream().map(entry -> { - Path filePath = entry.getKey().getPath(); - return FSUtils.getFileIdFromFilePath(filePath); - }).collect(Collectors.toSet()); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus, - filesToNumBlocksRollback, deletedFiles); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - default: - break; - } - return hoodieRollbackStats; - }).filter(Objects::nonNull).collect(); - + new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); // Delete Inflight instants if enabled - deleteInflightInstant(deleteInstants, this.getActiveTimeline(), - new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())); + deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback + .getAction(), instantToRollback.getTimestamp())); - logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return allRollbackStats; } + /** + * Generate all rollback requests that we need to perform for rolling back this action without actually performing + * rolling back + * @param jsc JavaSparkContext + * @param instantToRollback Instant to Rollback + * @return list of rollback requests + * @throws IOException + */ + private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback) + throws IOException { + String commit = instantToRollback.getTimestamp(); + List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); + int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)) + .flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); + List<RollbackRequest> partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this " + + "instant"); + partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( + partitionPath, instantToRollback)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline() + .filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction( + partitionPath, instantToRollback)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, + instantToRollback)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.getCommitTimeline().getInstantDetails( + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( + partitionPath, instantToRollback)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); + } + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); + } + @Override public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException { @@ -450,19 +417,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi } } - private Map<HeaderMetadataType, String> generateHeader(String commit) { - // generate metadata - Map<HeaderMetadataType, String> header = Maps.newHashMap(); - header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); - header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); - return header; - } + private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata) { + Preconditions.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, - HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus, - Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) { // wStat.getPrevCommit() might not give the right commit time in the following // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. @@ -470,47 +428,27 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi // baseCommit always by listing the file slice Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath) .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - // Filter out stats without prevCommit since they are all inserts - return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null - && !deletedFiles.contains(wStat.getFileId()); - }).forEach(wStat -> { - Writer writer = null; - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - if (null != baseCommitTime) { - boolean success = false; - try { - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath)) - .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime).withFs(this.metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - // generate metadata - Map<HeaderMetadataType, String> header = generateHeader(commit); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); - success = true; - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException("Failed to rollback for commit " + commit, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - if (success) { - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - filesToNumBlocksRollback.put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()), - 1L); - } - } catch (IOException io) { - throw new UncheckedIOException(io); + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() + .filter(wStat -> { + + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); } - } - } - }); - return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); - } + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime, rollbackInstant); + }).collect(Collectors.toList()); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java new file mode 100644 index 0000000..8bdf371 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +/** + * Performs Rollback of Hoodie Tables + */ +public class RollbackExecutor implements Serializable { + + private static Logger logger = LogManager.getLogger(RollbackExecutor.class); + + private final HoodieTableMetaClient metaClient; + private final HoodieWriteConfig config; + + public RollbackExecutor(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + this.metaClient = metaClient; + this.config = config; + } + + /** + * Performs all rollback actions that we have collected in parallel. + */ + public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, + HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) { + + SerializablePathFilter filter = (path) -> { + if (path.toString().contains(".parquet")) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return instantToRollback.getTimestamp().equals(fileCommitTime); + } else if (path.toString().contains(".log")) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return instantToRollback.getTimestamp().equals(fileCommitTime); + } + return false; + }; + + int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> { + final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>(); + switch (rollbackRequest.getRollbackAction()) { + case DELETE_DATA_FILES_ONLY: { + deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath()); + return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case DELETE_DATA_AND_LOG_FILES: { + deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter); + return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case APPEND_ROLLBACK_BLOCK: { + Writer writer = null; + boolean success = false; + try { + writer = HoodieLogFormat.newWriterBuilder().onParentPath( + FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) + .withFileId(rollbackRequest.getFileId().get()) + .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()) + .withFs(metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + + // generate metadata + Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp()); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + success = true; + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException( + "Failed to rollback for instant " + instantToRollback, io); + } finally { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException io) { + throw new UncheckedIOException(io); + } + } + + // This step is intentionally done after writer is closed. Guarantees that + // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in + // cloud-storage : HUDI-168 + Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); + filesToNumBlocksRollback.put(metaClient.getFs() + .getFileStatus(writer.getLogFile().getPath()), 1L); + return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + } + default: + throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); + } + }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect(); + } + + /** + * Helper to merge 2 rollback-stats for a given partition + * + * @param stat1 HoodieRollbackStat + * @param stat2 HoodieRollbackStat + * @return Merged HoodieRollbackStat + */ + private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) { + Preconditions.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath())); + final List<String> successDeleteFiles = new ArrayList<>(); + final List<String> failedDeleteFiles = new ArrayList<>(); + final Map<FileStatus, Long> commandBlocksCount = new HashMap<>(); + + if (stat1.getSuccessDeleteFiles() != null) { + successDeleteFiles.addAll(stat1.getSuccessDeleteFiles()); + } + if (stat2.getSuccessDeleteFiles() != null) { + successDeleteFiles.addAll(stat2.getSuccessDeleteFiles()); + } + if (stat1.getFailedDeleteFiles() != null) { + failedDeleteFiles.addAll(stat1.getFailedDeleteFiles()); + } + if (stat2.getFailedDeleteFiles() != null) { + failedDeleteFiles.addAll(stat2.getFailedDeleteFiles()); + } + if (stat1.getCommandBlocksCount() != null) { + commandBlocksCount.putAll(stat1.getCommandBlocksCount()); + } + if (stat2.getCommandBlocksCount() != null) { + commandBlocksCount.putAll(stat2.getCommandBlocksCount()); + } + return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); + } + + /** + * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits + */ + private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + Map<FileStatus, Boolean> results, String partitionPath, + PathFilter filter) throws IOException { + logger.info("Cleaning path " + partitionPath); + FileSystem fs = metaClient.getFs(); + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + for (FileStatus file : toBeDeleted) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + logger.info("Delete file " + file.getPath() + "\t" + success); + } + return results; + } + + /** + * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits + */ + private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException { + logger.info("Cleaning path " + partitionPath); + FileSystem fs = metaClient.getFs(); + PathFilter filter = (path) -> { + if (path.toString().contains(".parquet")) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } + return false; + }; + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + for (FileStatus file : toBeDeleted) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + logger.info("Delete file " + file.getPath() + "\t" + success); + } + return results; + } + + + private Map<HeaderMetadataType, String> generateHeader(String commit) { + // generate metadata + Map<HeaderMetadataType, String> header = Maps.newHashMap(); + header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); + header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + return header; + } + + public interface SerializablePathFilter extends PathFilter, Serializable { + + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java new file mode 100644 index 0000000..0e61988 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; + +/** + * Request for performing one rollback action + */ +public class RollbackRequest { + + /** + * Rollback Action Types + */ + public enum RollbackAction { + DELETE_DATA_FILES_ONLY, + DELETE_DATA_AND_LOG_FILES, + APPEND_ROLLBACK_BLOCK + } + + /** + * Partition path that needs to be rolled-back + */ + private final String partitionPath; + + /** + * Rollback Instant + */ + private final HoodieInstant rollbackInstant; + + /** + * FileId in case of appending rollback block + */ + private final Option<String> fileId; + + /** + * Latest base instant needed for appending rollback block instant + */ + private final Option<String> latestBaseInstant; + + /** + * Rollback Action + */ + private final RollbackAction rollbackAction; + + public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, + Option<String> fileId, Option<String> latestBaseInstant, RollbackAction rollbackAction) { + this.partitionPath = partitionPath; + this.rollbackInstant = rollbackInstant; + this.fileId = fileId; + this.latestBaseInstant = latestBaseInstant; + this.rollbackAction = rollbackAction; + } + + public static RollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath, + HoodieInstant rollbackInstant) { + return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(), + RollbackAction.DELETE_DATA_FILES_ONLY); + } + + public static RollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath, + HoodieInstant rollbackInstant) { + return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(), + RollbackAction.DELETE_DATA_AND_LOG_FILES); + } + + public static RollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId, + String baseInstant, HoodieInstant rollbackInstant) { + return new RollbackRequest(partitionPath, rollbackInstant, Option.of(fileId), Option.of(baseInstant), + RollbackAction.APPEND_ROLLBACK_BLOCK); + } + + public String getPartitionPath() { + return partitionPath; + } + + public HoodieInstant getRollbackInstant() { + return rollbackInstant; + } + + public Option<String> getFileId() { + return fileId; + } + + public Option<String> getLatestBaseInstant() { + return latestBaseInstant; + } + + public RollbackAction getRollbackAction() { + return rollbackAction; + } +}