This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f2ba0fead2 [HUDI-3085] Improve bulk insert partitioner abstraction (#4441) f2ba0fead2 is described below commit f2ba0fead24072e37c68477d0cfc3489fa098938 Author: Yuwei XIAO <ywxiaoz...@gmail.com> AuthorDate: Mon Apr 25 18:42:17 2022 +0800 [HUDI-3085] Improve bulk insert partitioner abstraction (#4441) --- .../apache/hudi/table/BulkInsertPartitioner.java | 28 +++++++++++++++++++++- .../table/action/commit/BaseBulkInsertHelper.java | 2 +- .../run/strategy/JavaExecutionStrategy.java | 11 +++++---- .../table/action/commit/JavaBulkInsertHelper.java | 17 +++++++------ .../MultipleSparkJobExecutionStrategy.java | 5 ++-- .../SparkSingleFileSortExecutionStrategy.java | 1 + .../bulkinsert/BulkInsertMapFunction.java | 11 +++++---- .../bulkinsert/RDDSpatialCurveSortPartitioner.java | 11 ++++----- .../table/action/commit/SparkBulkInsertHelper.java | 24 +++++++------------ 9 files changed, 65 insertions(+), 45 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index fd1558a823..63b502531a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -18,12 +18,18 @@ package org.apache.hudi.table; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.WriteHandleFactory; + +import java.io.Serializable; + /** * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - * Output spark partition will have records from only one hoodie partition. - Average records per output spark * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. */ -public interface BulkInsertPartitioner<I> { +public interface BulkInsertPartitioner<I> extends Serializable { /** * Repartitions the input records into at least expected number of output spark partitions. @@ -38,4 +44,24 @@ public interface BulkInsertPartitioner<I> { * @return {@code true} if the records within a partition are sorted; {@code false} otherwise. */ boolean arePartitionRecordsSorted(); + + /** + * Return file group id prefix for the given data partition. + * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group + * @param partitionId data partition + * @return + */ + default String getFileIdPfx(int partitionId) { + return FSUtils.createNewFileIdPfx(); + } + + /** + * Return write handle factory for the given partition. + * @param partitionId data partition + * @return + */ + default Option<WriteHandleFactory> getWriteHandleFactory(int partitionId) { + return Option.empty(); + } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java index ad2145c350..5355194ff7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java @@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K, public abstract O bulkInsert(I inputRecords, String instantTime, HoodieTable<T, I, K, O> table, HoodieWriteConfig config, boolean performDedupe, - Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean addMetadataFields, int parallelism, WriteHandleFactory writeHandleFactory); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 7d7609f0fa..233c70ecf9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; @@ -121,16 +122,16 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> * * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. * @param schema Schema of the data including metadata fields. - * @return empty for now. + * @return partitioner for the java engine */ - protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) { + protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { - return Option.of(new JavaCustomColumnsSortPartitioner( + return new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema), - getWriteConfig().isConsistentLogicalTimestampEnabled())); + getWriteConfig().isConsistentLogicalTimestampEnabled()); } else { - return Option.empty(); + return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 39b2916732..e126372aa9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -77,8 +77,11 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base config.shouldAllowMultiWriteOnSameInstant()); } + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode())); + // write new files - List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,7 +93,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieWriteConfig config, boolean performDedupe, - Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { @@ -103,12 +106,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base parallelism, table); } - final List<HoodieRecord<T>> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); - // only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism); + final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism); FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass( config.getFileIdPrefixProviderClassName(), @@ -119,7 +117,8 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), - new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); + // Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine. + (WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll); return writeStatuses; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 5a03cdf3bc..e09457f0e5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; @@ -137,7 +138,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa * @param schema Schema of the data including metadata fields. * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. */ - protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) { + protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) { Option<String[]> orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); @@ -159,7 +160,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa default: throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } - }); + }).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode())); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 4a7ee7bcee..b61017c34c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -72,6 +72,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload< .withProps(getWriteConfig().getProps()).build(); // Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value. newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE)); + return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 24cdd70603..66c3bdddcb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -41,27 +42,27 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> private boolean areRecordsSorted; private HoodieWriteConfig config; private HoodieTable hoodieTable; - private List<String> fileIDPrefixes; private boolean useWriterSchema; + private BulkInsertPartitioner partitioner; private WriteHandleFactory writeHandleFactory; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - List<String> fileIDPrefixes, boolean useWriterSchema, + boolean useWriterSchema, BulkInsertPartitioner partitioner, WriteHandleFactory writeHandleFactory) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; - this.fileIDPrefixes = fileIDPrefixes; this.useWriterSchema = useWriterSchema; this.writeHandleFactory = writeHandleFactory; + this.partitioner = partitioner; } @Override public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, - writeHandleFactory); + partitioner.getFileIdPfx(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, + (WriteHandleFactory) partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 219fb0b165..50a0a534f8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -49,9 +49,9 @@ import java.util.List; public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { - private final HoodieSparkEngineContext sparkEngineContext; + private final transient HoodieSparkEngineContext sparkEngineContext; private final String[] orderByColumns; - private final Schema schema; + private final SerializableSchema schema; private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; @@ -64,14 +64,13 @@ public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload> this.orderByColumns = orderByColumns; this.layoutOptStrategy = layoutOptStrategy; this.curveCompositionStrategyType = curveCompositionStrategyType; - this.schema = schema; + this.schema = new SerializableSchema(schema); } @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { - SerializableSchema serializableSchema = new SerializableSchema(schema); JavaRDD<GenericRecord> genericRecordsRDD = - records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); + records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get()); Dataset<Row> sourceDataset = AvroConversionUtils.createDataFrame( @@ -82,7 +81,7 @@ public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload> Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions); - return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty()) + return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) .toJavaRDD() .map(record -> { String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 38e38101b0..1652c35eb6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -20,7 +20,6 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -39,8 +38,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * A spark implementation of {@link BaseBulkInsertHelper}. @@ -76,9 +73,12 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); + + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode())); + // write new files - HoodieData<WriteStatus> writeStatuses = - bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,7 +90,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, HoodieWriteConfig config, boolean performDedupe, - Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { @@ -103,20 +103,12 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas parallelism, table); } - final HoodieData<HoodieRecord<T>> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); - - // generate new file ID prefixes for each output partition - final List<String> fileIDPrefixes = - IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); + final HoodieData<HoodieRecord<T>> repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); JavaRDD<WriteStatus> writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords) .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true) + partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true) .flatMap(List::iterator); return HoodieJavaRDD.of(writeStatusRDD);