This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 96d46dc36cf HIVE-27951: hcatalog dynamic partitioning fails with 
partition already exist error when exist parent partitions path (#4937)
96d46dc36cf is described below

commit 96d46dc36cfd3a68c73f8c77e1f97c1c78507b24
Author: yigress <104102129+yigr...@users.noreply.github.com>
AuthorDate: Wed Jan 3 17:03:59 2024 -0800

    HIVE-27951: hcatalog dynamic partitioning fails with partition already 
exist error when exist parent partitions path (#4937)
---
 .../mapreduce/FileOutputCommitterContainer.java    | 37 ++++++++++++++--------
 .../mapreduce/TestHCatDynamicPartitioned.java      | 17 ++++++----
 .../TestHCatExternalDynamicPartitioned.java        |  4 +--
 3 files changed, 37 insertions(+), 21 deletions(-)

diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index de9ad252ff2..2ad306165d1 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -488,7 +488,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
   }
 
   /**
-   * Move all of the files from the temp directory to the final location
+   * Move task output from the temp directory to the final location
    * @param srcf the file to move
    * @param srcDir the source directory
    * @param destDir the target directory
@@ -538,17 +538,17 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
         final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, 
destDir, immutable);
         if (immutable && destFs.exists(finalOutputPath) &&
             
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, 
finalOutputPath)) {
-          throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
-              "Data already exists in " + finalOutputPath
-                  + ", duplicate publish not possible.");
-        }
-        if (srcStatus.isDirectory()) {
+          if (partitionsDiscoveredByPath.containsKey(srcF.toString())) {
+            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+                "Data already exists in " + finalOutputPath
+                    + ", duplicate publish not possible.");
+          }
+          // parent directory may exist for multi-partitions, check lower 
level partitions
+          Collections.addAll(srcQ, 
srcFs.listStatus(srcF,HIDDEN_FILES_PATH_FILTER));
+        } else if (srcStatus.isDirectory()) {
           if (canRename && dynamicPartitioningUsed) {
             // If it is partition, move the partition directory instead of 
each file.
-            // If custom dynamic location provided, need to rename to final 
output path
-            final Path parentDir = finalOutputPath.getParent();
-            Path dstPath = !customDynamicLocationUsed ? parentDir : 
finalOutputPath;
-            moves.add(Pair.of(srcF, dstPath));
+            moves.add(Pair.of(srcF, finalOutputPath));
           } else {
             Collections.addAll(srcQ, srcFs.listStatus(srcF, 
HIDDEN_FILES_PATH_FILTER));
           }
@@ -558,16 +558,27 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
       }
     }
 
-    if (moves.isEmpty()) {
+    bulkMoveFiles(conf, srcFs, destFs, moves);
+  }
+
+  /**
+   * Bulk move files from source to destination.
+   * @param srcFs the source filesystem where the source files are
+   * @param destFs the destionation filesystem where the destionation files are
+   * @param pairs list of pairs of <source_path, destination_path>, move 
source_path to destination_path
+   * @throws java.io.IOException
+   */
+  private void bulkMoveFiles(final Configuration conf, final FileSystem srcFs, 
final FileSystem destFs, List<Pair<Path, Path>> pairs) throws IOException{
+    if (pairs.isEmpty()) {
       return;
     }
-
+    final boolean canRename = srcFs.getUri().equals(destFs.getUri());
     final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
     final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
         
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
             new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) 
: null;
 
-    for (final Pair<Path, Path> pair: moves){
+    for (final Pair<Path, Path> pair: pairs){
       Path srcP = pair.getLeft();
       Path dstP = pair.getRight();
       final String msg = "Unable to move source " + srcP + " to destination " 
+ dstP;
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
index a97162de993..5ee3a6348d1 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
@@ -52,13 +52,13 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
   private static List<HCatFieldSchema> dataColumns;
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
   protected static final int NUM_RECORDS = 20;
-  protected static final int NUM_PARTITIONS = 5;
+  protected static final int NUM_TOP_PARTITIONS = 5;
 
   public TestHCatDynamicPartitioned(String formatName, String serdeClass, 
String inputFormatClass,
       String outputFormatClass) throws Exception {
     super(formatName, serdeClass, inputFormatClass, outputFormatClass);
     tableName = "testHCatDynamicPartitionedTable_" + formatName;
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
     generateDataColumns();
   }
 
@@ -67,6 +67,8 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", 
serdeConstants.INT_TYPE_NAME, "")));
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", 
serdeConstants.STRING_TYPE_NAME, "")));
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", 
serdeConstants.STRING_TYPE_NAME, "")));
+    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2", 
serdeConstants.STRING_TYPE_NAME, "")));
+
   }
 
   protected static void generateWriteRecords(int max, int mod, int offset) {
@@ -78,6 +80,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
       objList.add(i);
       objList.add("strvalue" + i);
       objList.add(String.valueOf((i % mod) + offset));
+      objList.add(String.valueOf((i / (max/2)) + offset));
       writeRecords.add(new DefaultHCatRecord(objList));
     }
   }
@@ -86,6 +89,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
   protected List<FieldSchema> getPartitionKeys() {
     List<FieldSchema> fields = new ArrayList<FieldSchema>();
     fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
+    fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""));
     return fields;
   }
 
@@ -117,8 +121,9 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
 
   protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
       String customDynamicPathPattern) throws Exception {
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
-    runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, 
asSingleMapTask, customDynamicPathPattern);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+    runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), 
NUM_RECORDS/2, true, asSingleMapTask, customDynamicPathPattern);
+    runMRCreate(null, dataColumns, 
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2, true, 
asSingleMapTask, customDynamicPathPattern);
 
     runMRRead(NUM_RECORDS);
 
@@ -140,7 +145,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     //Test for duplicate publish
     IOException exc = null;
     try {
-      generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+      generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
       Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, 
false,
           true, customDynamicPathPattern);
 
@@ -167,7 +172,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     driver.run(query);
     res = new ArrayList<String>();
     driver.getResults(res);
-    assertEquals(NUM_PARTITIONS, res.size());
+    assertEquals(NUM_TOP_PARTITIONS*2, res.size());
 
     query = "select * from " + tableName;
     driver.run(query);
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
index 18fcfdbdd2a..9698f178a8e 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
@@ -28,7 +28,7 @@ public class TestHCatExternalDynamicPartitioned extends 
TestHCatDynamicPartition
       throws Exception {
     super(formatName, serdeClass, inputFormatClass, outputFormatClass);
     tableName = "testHCatExternalDynamicPartitionedTable_" + formatName;
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
     generateDataColumns();
   }
 
@@ -43,7 +43,7 @@ public class TestHCatExternalDynamicPartitioned extends 
TestHCatDynamicPartition
    */
   @Test
   public void testHCatExternalDynamicCustomLocation() throws Exception {
-    runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
+    runHCatDynamicPartitionedTable(true, 
"mapred/externalDynamicOutput/${p1}/{p2}");
   }
 
 }

Reply via email to