HIVE-18288 - merge/concat not supported on Acid table (Eugene Koifman, reviewed 
by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a2bfb8b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a2bfb8b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a2bfb8b

Branch: refs/heads/master
Commit: 4a2bfb8bed36b53b1bcc4eee0e4f916820f335f4
Parents: ab7affe
Author: Eugene Koifman <ekoif...@apache.org>
Authored: Mon May 7 11:55:16 2018 -0700
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Mon May 7 11:55:16 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   4 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |  21 ++-
 .../hadoop/hive/ql/TestTxnConcatenate.java      | 159 +++++++++++++++++++
 4 files changed, 177 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b872827..23a9c74 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2565,6 +2565,9 @@ public class HiveConf extends Configuration {
     COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name 
of Hadoop queue to which\n" +
       "Compaction jobs will be submitted.  Set to empty string to let Hadoop 
choose the queue."),
 
+    
TRANSACTIONAL_CONCATENATE_NOBLOCK("hive.transactional.concatenate.noblock", 
false,
+        "Will cause 'alter table T concatenate' to be non-blocking"),
+
     HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
         "Whether the compactor should compact insert-only tables. A safety 
switch."),
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5dbc478..903470a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4407,9 +4407,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throws HiveException {
     try {
       CompactionType cr = null;
-      if ("major".equals(compactType)) {
+      if ("major".equalsIgnoreCase(compactType)) {
         cr = CompactionType.MAJOR;
-      } else if ("minor".equals(compactType)) {
+      } else if ("minor".equalsIgnoreCase(compactType)) {
         cr = CompactionType.MINOR;
       } else {
         throw new RuntimeException("Unknown compaction type " + compactType);

http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index defb8be..f0b9eda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1964,9 +1964,19 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     try {
       tblObj = getTable(tableName);
-      // TODO: we should probably block all ACID tables here.
-      if (AcidUtils.isInsertOnlyTable(tblObj.getParameters())) {
-        throw new SemanticException("Merge is not supported for MM tables");
+      if(AcidUtils.isTransactionalTable(tblObj)) {
+        LinkedHashMap<String, String> newPartSpec = null;
+        if (partSpec != null) {
+          newPartSpec = new LinkedHashMap<>(partSpec);
+        }
+
+        boolean isBlocking = !HiveConf.getBoolVar(conf,
+            ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false);
+        AlterTableSimpleDesc desc = new AlterTableSimpleDesc(
+            tableName, newPartSpec, "MAJOR", isBlocking);
+
+        rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 
desc)));
+        return;
       }
       mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj));
 
@@ -2039,11 +2049,6 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         throw new 
SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg());
       }
 
-      // transactional tables are compacted and no longer needs to be 
bucketed, so not safe for merge/concatenation
-      boolean isAcid = AcidUtils.isTransactionalTable(tblObj);
-      if (isAcid) {
-        throw new 
SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL.getMsg());
-      }
       inputDir.add(oldTblPartLoc);
 
       mergeDesc.setInputDir(inputDir);

http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
new file mode 100644
index 0000000..92bcefe
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -0,0 +1,159 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class TestTxnConcatenate extends TxnCommandsBaseForTests {
+  static final private Logger LOG = 
LoggerFactory.getLogger(TestTxnConcatenate.class);
+  private static final String TEST_DATA_DIR =
+      new File(System.getProperty("java.io.tmpdir") +
+          File.separator + TestTxnLoadData.class.getCanonicalName()
+          + "-" + System.currentTimeMillis()
+      ).getPath().replaceAll("\\\\", "/");
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Override
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+
+  @Test
+  public void testConcatenate() throws Exception {
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " 
values(1,2),(4,5)");
+    runStatementOnDriver("update " + Table.ACIDTBL + " set b = 4");
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " 
values(5,6),(8,8)");
+    String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + 
Table.ACIDTBL + " order by a, b";
+    String[][] expected = new String[][] {
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4",
+            "acidtbl/delta_0000002_0000002_0000/bucket_00001"},
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4",
+            "acidtbl/delta_0000002_0000002_0000/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6",
+            "acidtbl/delta_0000003_0000003_0000/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+            "acidtbl/delta_0000003_0000003_0000/bucket_00001"}};
+    checkResult(expected, testQuery, false, "check data", LOG);
+
+    /*in UTs, there is no standalone HMS running to kick off compaction so 
it's done via runWorker()
+     but in normal usage 'concatenate' is blocking, */
+    hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, 
true);
+    runStatementOnDriver("alter table " + Table.ACIDTBL + " concatenate");
+
+    TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    runWorker(hiveConf);
+    rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    String[][] expected2 = new String[][] {
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4",
+            "acidtbl/base_0000003/bucket_00001"},
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4",
+            "acidtbl/base_0000003/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6",
+            "acidtbl/base_0000003/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+            "acidtbl/base_0000003/bucket_00001"}};
+    checkResult(expected2, testQuery, false, "check data after concatenate", 
LOG);
+  }
+  @Test
+  public void testConcatenatePart() throws Exception {
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " 
values(1,2,'p1'),(4,5,'p2')");
+    runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 4 where 
p='p1'");
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " 
values(5,6,'p1'),(8,8,'p2')");
+    String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + 
Table.ACIDTBLPART + " order by a, b";
+    String[][] expected = new String[][] {
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
+            "acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
+            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
+            "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+    checkResult(expected, testQuery, false, "check data", LOG);
+
+    /*in UTs, there is no standalone HMS running to kick off compaction so 
it's done via runWorker()
+     but in normal usage 'concatenate' is blocking, */
+    hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, 
true);
+    runStatementOnDriver("alter table " + Table.ACIDTBLPART + " 
PARTITION(p='p1') concatenate");
+
+    TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    runWorker(hiveConf);
+    rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    String[][] expected2 = new String[][] {
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
+            "acidtblpart/p=p1/base_0000003/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
+            "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
+            "acidtblpart/p=p1/base_0000003/bucket_00001"},
+        {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+            "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+
+    checkResult(expected2, testQuery, false, "check data after concatenate", 
LOG);
+  }
+
+  @Test
+  public void testConcatenateMM() throws Exception {
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T(a int, b int)");
+    runStatementOnDriver("insert into T values(1,2),(4,5)");
+    runStatementOnDriver("insert into T values(5,6),(8,8)");
+    String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b";
+    String[][] expected = new String[][] {
+        {"1\t2",
+            "t/delta_0000001_0000001_0000/000000_0"},
+        {"4\t5",
+            "t/delta_0000001_0000001_0000/000000_0"},
+        {"5\t6",
+            "t/delta_0000002_0000002_0000/000000_0"},
+        {"8\t8",
+            "t/delta_0000002_0000002_0000/000000_0"}};
+    checkResult(expected, testQuery, false, "check data", LOG);
+
+        /*in UTs, there is no standalone HMS running to kick off compaction so 
it's done via runWorker()
+     but in normal usage 'concatenate' is blocking, */
+    hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, 
true);
+    runStatementOnDriver("alter table T concatenate");
+
+    TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    runWorker(hiveConf);
+    rsp = txnStore.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    String[][] expected2 = new String[][] {
+        {"1\t2",
+            "t/base_0000002/000000_0"},
+        {"4\t5",
+            "t/base_0000002/000000_0"},
+        {"5\t6",
+            "t/base_0000002/000000_0"},
+        {"8\t8",
+            "t/base_0000002/000000_0"}};
+    checkResult(expected2, testQuery, false, "check data after concatenate", 
LOG);
+  }
+}

Reply via email to