This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 20db8b7f6a5 [HUDI-7441] Move `getWritePartitionPaths` method to common module to decouple hive dependency (#10744) 20db8b7f6a5 is described below commit 20db8b7f6a565981fad09839ee521b89ed91f341 Author: stayrascal <stayras...@users.noreply.github.com> AuthorDate: Tue Feb 27 13:44:43 2024 +0800 [HUDI-7441] Move `getWritePartitionPaths` method to common module to decouple hive dependency (#10744) Co-authored-by: wuzhiping <wuzhiping....@bytedance.com> --- .../org/apache/hudi/metadata/HoodieTableMetadataUtil.java | 13 +++++++++++++ .../java/org/apache/hudi/source/IncrementalInputSplits.java | 4 ++-- .../hadoop/realtime/HoodieMergeOnReadTableInputFormat.java | 3 ++- .../apache/hudi/hadoop/utils/HoodieInputFormatUtils.java | 13 ------------- .../org/apache/hudi/MergeOnReadIncrementalRelation.scala | 3 ++- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4ab72708d4d..07e8154e3b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -455,6 +455,19 @@ public class HoodieTableMetadataUtil { .collect(Collectors.toList()); } + /** + * Returns all the incremental write partition paths as a set with the given commits metadata. + * + * @param metadataList The commits metadata + * @return the partition path set + */ + public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) { + return metadataList.stream() + .map(HoodieCommitMetadata::getWritePartitionPaths) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + /** * Convert commit action metadata to bloom filter records. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index c1cd5874d96..2b4dec9995c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -35,7 +35,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.source.prune.PartitionPruners; import org.apache.hudi.table.format.cdc.CdcInputSplit; @@ -412,7 +412,7 @@ public class IncrementalInputSplits implements Serializable { * @return the set of read partitions */ private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) { - Set<String> partitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList); + Set<String> partitions = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList); // apply partition push down if (this.partitionPruner != null) { Set<String> selectedPartitions = this.partitionPruner.filter(partitions); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 0cfe0d0a194..76209422fe9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import java.io.IOException; import java.util.ArrayList; @@ -192,7 +193,7 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp // build fileGroup from fsView Path basePath = new Path(tableMetaClient.getBasePath()); // filter affectedPartition by inputPaths - List<String> affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream() + List<String> affectedPartition = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList).stream() .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); if (affectedPartition.isEmpty()) { return result; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 8922b837871..4ab72701a11 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -514,19 +514,6 @@ public class HoodieInputFormatUtils { return fullPathToFileStatus.values().toArray(new FileStatus[0]); } - /** - * Returns all the incremental write partition paths as a set with the given commits metadata. - * - * @param metadataList The commits metadata - * @return the partition path set - */ - public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) { - return metadataList.stream() - .map(HoodieCommitMetadata::getWritePartitionPaths) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - public static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, String[] hosts) { try { return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, hosts), path); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 7b25fd9a8c7..243cf3db550 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -26,9 +26,10 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling. import org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, concatTimeline, getCommitMetadata, handleHollowCommitIfNeeded} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getWritePartitionPaths, listAffectedFilesForCommits} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow