This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0258a89112a [HUDI-6515] Fix Spark2 do not support bucket bulk insert (#9163) 0258a89112a is described below commit 0258a89112a6071a8074757236e19a7b27539dbd Author: StreamingFlames <18889897...@163.com> AuthorDate: Tue Jul 11 13:45:35 2023 +0800 [HUDI-6515] Fix Spark2 do not support bucket bulk insert (#9163) --- .../apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java index 9f878dfa572..666ca3ec989 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java @@ -19,7 +19,9 @@ package org.apache.hudi.internal; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper; import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper; import org.apache.spark.sql.catalyst.InternalRow; @@ -39,8 +41,11 @@ public class HoodieBulkInsertDataInternalWriter implements DataWriter<InternalRo public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean populateMetaFields, boolean arePartitionRecordsSorted) { - this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable, - writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted); + this.bulkInsertWriterHelper = writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET + ? new BucketBulkInsertDataInternalWriterHelper(hoodieTable, + writeConfig, instantTime, taskPartitionId, taskId, 0, structType, populateMetaFields, arePartitionRecordsSorted) + : new BulkInsertDataInternalWriterHelper(hoodieTable, + writeConfig, instantTime, taskPartitionId, taskId, 0, structType, populateMetaFields, arePartitionRecordsSorted); } @Override