This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f2ba0fead2 [HUDI-3085] Improve bulk insert partitioner abstraction 
(#4441)
f2ba0fead2 is described below

commit f2ba0fead24072e37c68477d0cfc3489fa098938
Author: Yuwei XIAO <ywxiaoz...@gmail.com>
AuthorDate: Mon Apr 25 18:42:17 2022 +0800

    [HUDI-3085] Improve bulk insert partitioner abstraction (#4441)
---
 .../apache/hudi/table/BulkInsertPartitioner.java   | 28 +++++++++++++++++++++-
 .../table/action/commit/BaseBulkInsertHelper.java  |  2 +-
 .../run/strategy/JavaExecutionStrategy.java        | 11 +++++----
 .../table/action/commit/JavaBulkInsertHelper.java  | 17 +++++++------
 .../MultipleSparkJobExecutionStrategy.java         |  5 ++--
 .../SparkSingleFileSortExecutionStrategy.java      |  1 +
 .../bulkinsert/BulkInsertMapFunction.java          | 11 +++++----
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java | 11 ++++-----
 .../table/action/commit/SparkBulkInsertHelper.java | 24 +++++++------------
 9 files changed, 65 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index fd1558a823..63b502531a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -18,12 +18,18 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+
 /**
  * Repartition input records into at least expected number of output spark 
partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - 
Average records per output spark
  * partitions should be almost equal to (#inputRecords / 
#outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public interface BulkInsertPartitioner<I> extends Serializable {
 
   /**
    * Repartitions the input records into at least expected number of output 
spark partitions.
@@ -38,4 +44,24 @@ public interface BulkInsertPartitioner<I> {
    * @return {@code true} if the records within a partition are sorted; {@code 
false} otherwise.
    */
   boolean arePartitionRecordsSorted();
+
+  /**
+   * Return file group id prefix for the given data partition.
+   * By defauult, return a new file group id prefix, so that incoming records 
will route to a fresh new file group
+   * @param partitionId data partition
+   * @return
+   */
+  default String getFileIdPfx(int partitionId) {
+    return FSUtils.createNewFileIdPfx();
+  }
+
+  /**
+   * Return write handle factory for the given partition.
+   * @param partitionId data partition
+   * @return
+   */
+  default Option<WriteHandleFactory> getWriteHandleFactory(int partitionId) {
+    return Option.empty();
+  }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index ad2145c350..5355194ff7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends 
HoodieRecordPayload, I, K,
   public abstract O bulkInsert(I inputRecords, String instantTime,
                                HoodieTable<T, I, K, O> table, 
HoodieWriteConfig config,
                                boolean performDedupe,
-                               Option<BulkInsertPartitioner> 
userDefinedBulkInsertPartitioner,
+                               BulkInsertPartitioner partitioner,
                                boolean addMetadataFields,
                                int parallelism,
                                WriteHandleFactory writeHandleFactory);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 7d7609f0fa..233c70ecf9 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
+import 
org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
 import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
@@ -121,16 +122,16 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
    *
    * @param strategyParams Strategy parameters containing columns to sort the 
data by when clustering.
    * @param schema         Schema of the data including metadata fields.
-   * @return empty for now.
+   * @return partitioner for the java engine
    */
-  protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+  protected BulkInsertPartitioner<List<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
     if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
-      return Option.of(new JavaCustomColumnsSortPartitioner(
+      return new JavaCustomColumnsSortPartitioner(
           strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
           HoodieAvroUtils.addMetadataFields(schema),
-          getWriteConfig().isConsistentLogicalTimestampEnabled()));
+          getWriteConfig().isConsistentLogicalTimestampEnabled());
     } else {
-      return Option.empty();
+      return 
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
     }
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 39b2916732..e126372aa9 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -77,8 +77,11 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
           config.shouldAllowMultiWriteOnSameInstant());
     }
 
+    BulkInsertPartitioner partitioner = 
userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
+
     // write new files
-    List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, 
table, config, performDedupe, userDefinedBulkInsertPartitioner, false, 
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
+    List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, 
table, config, performDedupe, partitioner, false,
+        config.getBulkInsertShuffleParallelism(), new 
CreateHandleFactory(false));
     //update index
     ((BaseJavaCommitActionExecutor) 
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
     return result;
@@ -90,7 +93,7 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
                                       HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
                                       HoodieWriteConfig config,
                                       boolean performDedupe,
-                                      Option<BulkInsertPartitioner> 
userDefinedBulkInsertPartitioner,
+                                      BulkInsertPartitioner partitioner,
                                       boolean useWriterSchema,
                                       int parallelism,
                                       WriteHandleFactory writeHandleFactory) {
@@ -103,12 +106,7 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
           parallelism, table);
     }
 
-    final List<HoodieRecord<T>> repartitionedRecords;
-    BulkInsertPartitioner partitioner = 
userDefinedBulkInsertPartitioner.isPresent()
-        ? userDefinedBulkInsertPartitioner.get()
-        : 
JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
-    // only List is supported for Java partitioner, but it is not enforced by 
BulkInsertPartitioner API. To improve this, TODO HUDI-3463
-    repartitionedRecords = (List<HoodieRecord<T>>) 
partitioner.repartitionRecords(dedupedRecords, parallelism);
+    final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) 
partitioner.repartitionRecords(dedupedRecords, parallelism);
 
     FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) 
ReflectionUtils.loadClass(
         config.getFileIdPrefixProviderClassName(),
@@ -119,7 +117,8 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
     new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
         config, instantTime, table,
         fileIdPrefixProvider.createFilePrefix(""), 
table.getTaskContextSupplier(),
-        new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+        // Always get the first WriteHandleFactory, as there is only a single 
data partition for hudi java engine.
+        (WriteHandleFactory) 
partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll);
 
     return writeStatuses;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 5a03cdf3bc..e09457f0e5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieIOException;
+import 
org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
 import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
 import org.apache.hudi.io.IOUtils;
@@ -137,7 +138,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * @param schema         Schema of the data including metadata fields.
    * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are 
provided, otherwise empty.
    */
-  protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+  protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
     Option<String[]> orderByColumnsOpt =
         Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
             .map(listStr -> listStr.split(","));
@@ -159,7 +160,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
         default:
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
-    });
+    
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 4a7ee7bcee..b61017c34c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -72,6 +72,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends 
HoodieRecordPayload<
         .withProps(getWriteConfig().getProps()).build();
     // Since clustering will write to single file group using 
HoodieUnboundedCreateHandle, set max file size to a large value.
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));
+
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(), newConfig,
         false, getPartitioner(strategyParams, schema), true, numOutputGroups, 
new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), 
preserveHoodieMetadata));
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
index 24cdd70603..66c3bdddcb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.function.Function2;
@@ -41,27 +42,27 @@ public class BulkInsertMapFunction<T extends 
HoodieRecordPayload>
   private boolean areRecordsSorted;
   private HoodieWriteConfig config;
   private HoodieTable hoodieTable;
-  private List<String> fileIDPrefixes;
   private boolean useWriterSchema;
+  private BulkInsertPartitioner partitioner;
   private WriteHandleFactory writeHandleFactory;
 
   public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
                                HoodieWriteConfig config, HoodieTable 
hoodieTable,
-                               List<String> fileIDPrefixes, boolean 
useWriterSchema,
+                               boolean useWriterSchema, BulkInsertPartitioner 
partitioner,
                                WriteHandleFactory writeHandleFactory) {
     this.instantTime = instantTime;
     this.areRecordsSorted = areRecordsSorted;
     this.config = config;
     this.hoodieTable = hoodieTable;
-    this.fileIDPrefixes = fileIDPrefixes;
     this.useWriterSchema = useWriterSchema;
     this.writeHandleFactory = writeHandleFactory;
+    this.partitioner = partitioner;
   }
 
   @Override
   public Iterator<List<WriteStatus>> call(Integer partition, 
Iterator<HoodieRecord<T>> recordItr) {
     return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, 
instantTime, hoodieTable,
-        fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), 
useWriterSchema,
-        writeHandleFactory);
+        partitioner.getFileIdPfx(partition), 
hoodieTable.getTaskContextSupplier(), useWriterSchema,
+        (WriteHandleFactory) 
partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 219fb0b165..50a0a534f8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -49,9 +49,9 @@ import java.util.List;
 public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
-  private final HoodieSparkEngineContext sparkEngineContext;
+  private final transient HoodieSparkEngineContext sparkEngineContext;
   private final String[] orderByColumns;
-  private final Schema schema;
+  private final SerializableSchema schema;
   private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
   private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
 
@@ -64,14 +64,13 @@ public class RDDSpatialCurveSortPartitioner<T extends 
HoodieRecordPayload>
     this.orderByColumns = orderByColumns;
     this.layoutOptStrategy = layoutOptStrategy;
     this.curveCompositionStrategyType = curveCompositionStrategyType;
-    this.schema = schema;
+    this.schema = new SerializableSchema(schema);
   }
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
-    SerializableSchema serializableSchema = new SerializableSchema(schema);
     JavaRDD<GenericRecord> genericRecordsRDD =
-        records.map(f -> (GenericRecord) 
f.getData().getInsertValue(serializableSchema.get()).get());
+        records.map(f -> (GenericRecord) 
f.getData().getInsertValue(schema.get()).get());
 
     Dataset<Row> sourceDataset =
         AvroConversionUtils.createDataFrame(
@@ -82,7 +81,7 @@ public class RDDSpatialCurveSortPartitioner<T extends 
HoodieRecordPayload>
 
     Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
 
-    return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), 
schema.getNamespace(), false, Option.empty())
+    return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
         .toJavaRDD()
         .map(record -> {
           String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index 38e38101b0..1652c35eb6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,8 +38,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * A spark implementation of {@link BaseBulkInsertHelper}.
@@ -76,9 +73,12 @@ public class SparkBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Bas
     table.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED,
             executor.getCommitActionType(), instantTime), Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
+
+    BulkInsertPartitioner partitioner = 
userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
+
     // write new files
-    HoodieData<WriteStatus> writeStatuses =
-        bulkInsert(inputRecords, instantTime, table, config, performDedupe, 
userDefinedBulkInsertPartitioner, false, 
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
+    HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, 
instantTime, table, config, performDedupe, partitioner, false,
+        config.getBulkInsertShuffleParallelism(), new 
CreateHandleFactory(false));
     //update index
     ((BaseSparkCommitActionExecutor) 
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
     return result;
@@ -90,7 +90,7 @@ public class SparkBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Bas
                                             HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> 
table,
                                             HoodieWriteConfig config,
                                             boolean performDedupe,
-                                            Option<BulkInsertPartitioner> 
userDefinedBulkInsertPartitioner,
+                                            BulkInsertPartitioner partitioner,
                                             boolean useWriterSchema,
                                             int parallelism,
                                             WriteHandleFactory 
writeHandleFactory) {
@@ -103,20 +103,12 @@ public class SparkBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Bas
           parallelism, table);
     }
 
-    final HoodieData<HoodieRecord<T>> repartitionedRecords;
-    BulkInsertPartitioner partitioner = 
userDefinedBulkInsertPartitioner.isPresent()
-        ? userDefinedBulkInsertPartitioner.get()
-        : 
BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
     // only JavaRDD is supported for Spark partitioner, but it is not enforced 
by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
-    repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) 
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), 
parallelism));
-
-    // generate new file ID prefixes for each output partition
-    final List<String> fileIDPrefixes =
-        IntStream.range(0, parallelism).mapToObj(i -> 
FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+    final HoodieData<HoodieRecord<T>> repartitionedRecords = 
HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) 
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), 
parallelism));
 
     JavaRDD<WriteStatus> writeStatusRDD = 
HoodieJavaRDD.getJavaRDD(repartitionedRecords)
         .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
-            partitioner.arePartitionRecordsSorted(), config, table, 
fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
+            partitioner.arePartitionRecordsSorted(), config, table, 
useWriterSchema, partitioner, writeHandleFactory), true)
         .flatMap(List::iterator);
 
     return HoodieJavaRDD.of(writeStatusRDD);

Reply via email to