This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0258a89112a [HUDI-6515] Fix Spark2 do not support bucket bulk insert 
(#9163)
0258a89112a is described below

commit 0258a89112a6071a8074757236e19a7b27539dbd
Author: StreamingFlames <18889897...@163.com>
AuthorDate: Tue Jul 11 13:45:35 2023 +0800

    [HUDI-6515] Fix Spark2 do not support bucket bulk insert (#9163)
---
 .../apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
index 9f878dfa572..666ca3ec989 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.internal;
 
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
 import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -39,8 +41,11 @@ public class HoodieBulkInsertDataInternalWriter implements 
DataWriter<InternalRo
   public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                             String instantTime, int 
taskPartitionId, long taskId, long taskEpochId,
                                             StructType structType, boolean 
populateMetaFields, boolean arePartitionRecordsSorted) {
-    this.bulkInsertWriterHelper = new 
BulkInsertDataInternalWriterHelper(hoodieTable,
-        writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, 
structType, populateMetaFields, arePartitionRecordsSorted);
+    this.bulkInsertWriterHelper = writeConfig.getIndexType() == 
HoodieIndex.IndexType.BUCKET
+        ? new BucketBulkInsertDataInternalWriterHelper(hoodieTable,
+        writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted)
+        : new BulkInsertDataInternalWriterHelper(hoodieTable,
+        writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted);
   }
 
   @Override

Reply via email to