Repository: hive
Updated Branches:
  refs/heads/branch-2.2 0e795debd -> c6e60d1e3
  refs/heads/branch-2.3 f4d288fad -> 0c56cf696


HIVE-17562: ACID 1.0 + ETL strategy should treat empty compacted files as 
uncovered deltas (Prasanth Jayachandran reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b71a88f8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b71a88f8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b71a88f8

Branch: refs/heads/branch-2.2
Commit: b71a88f8e049bf2996fdf51de2aba0d68f4f28bb
Parents: 0e795de
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Wed Sep 27 12:57:33 2017 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Thu Oct 5 10:37:39 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  58 +++++---
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 135 +++++++++++++++++++
 2 files changed, 172 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b71a88f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fc7bbe4..44b5011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1122,7 +1122,8 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final boolean hasBase;
     private OrcFile.WriterVersion writerVersion;
     private long projColsUncompressedSize;
-    private final List<OrcSplit> deltaSplits;
+    private List<OrcSplit> deltaSplits;
+    private final SplitInfo splitInfo;
     private final ByteBuffer ppdResult;
     private final UserGroupInformation ugi;
     private final boolean allowSyntheticFileIds;
@@ -1145,6 +1146,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       this.hasBase = splitInfo.hasBase;
       this.projColsUncompressedSize = -1;
       this.deltaSplits = splitInfo.getSplits();
+      this.splitInfo = splitInfo;
       this.allowSyntheticFileIds = allowSyntheticFileIds;
       this.ppdResult = splitInfo.ppdResult;
     }
@@ -1319,6 +1321,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
                 stripeStats, stripes.size(), file.getPath(), evolution);
           }
         }
+
         return generateSplitsFromStripes(includeStripe);
       }
     }
@@ -1351,31 +1354,44 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     private List<OrcSplit> generateSplitsFromStripes(boolean[] includeStripe) 
throws IOException {
       List<OrcSplit> splits = new ArrayList<>(stripes.size());
-      // if we didn't have predicate pushdown, read everything
-      if (includeStripe == null) {
-        includeStripe = new boolean[stripes.size()];
-        Arrays.fill(includeStripe, true);
-      }
 
-      OffsetAndLength current = new OffsetAndLength();
-      int idx = -1;
-      for (StripeInformation stripe : stripes) {
-        idx++;
-
-        if (!includeStripe[idx]) {
-          // create split for the previous unfinished stripe
-          if (current.offset != -1) {
-            splits.add(createSplit(current.offset, current.length, orcTail));
-            current.offset = -1;
-          }
-          continue;
+      // after major compaction, base files may become empty base files. 
Following sequence is an example
+      // 1) insert some rows
+      // 2) delete all rows
+      // 3) major compaction
+      // 4) insert some rows
+      // In such cases, consider base files without any stripes as uncovered 
delta
+      if (stripes == null || stripes.isEmpty()) {
+        AcidOutputFormat.Options options = 
AcidUtils.parseBaseOrDeltaBucketFilename(file.getPath(), context.conf);
+        int bucket = options.getBucket();
+        splitInfo.covered[bucket] = false;
+        deltaSplits = splitInfo.getSplits();
+      } else {
+        // if we didn't have predicate pushdown, read everything
+        if (includeStripe == null) {
+          includeStripe = new boolean[stripes.size()];
+          Arrays.fill(includeStripe, true);
         }
 
-        current = generateOrUpdateSplit(
+        OffsetAndLength current = new OffsetAndLength();
+        int idx = -1;
+        for (StripeInformation stripe : stripes) {
+          idx++;
+
+          if (!includeStripe[idx]) {
+            // create split for the previous unfinished stripe
+            if (current.offset != -1) {
+              splits.add(createSplit(current.offset, current.length, orcTail));
+              current.offset = -1;
+            }
+            continue;
+          }
+
+          current = generateOrUpdateSplit(
             splits, current, stripe.getOffset(), stripe.getLength(), orcTail);
+        }
+        generateLastSplit(splits, current, orcTail);
       }
-      generateLastSplit(splits, current, orcTail);
-
       // Add uncovered ACID delta splits.
       splits.addAll(deltaSplits);
       return splits;

http://git-wip-us.apache.org/repos/asf/hive/blob/b71a88f8/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index ddd59a1..6d6b54a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -312,6 +312,141 @@ public class TestTxnCommands2 {
     Assert.assertEquals(stringifyValues(resultData), rs);
   }
 
+  @Test
+  public void testBICompactedNoStripes() throws Exception {
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " order by a,b");
+    int[][] resultData = new int[][] {{1,2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    resultData = new int[][] {{3,4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+
+    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new 
ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 1, 
resp.getCompactsSize());
+    Assert.assertEquals("Unexpected 0 compaction state", 
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    resultData = new int[][] {{3,4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID");
+  }
+
+  @Test
+  public void testETLCompactedNoStripes() throws Exception {
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " order by a,b");
+    int[][] resultData = new int[][] {{1,2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    resultData = new int[][] {{3,4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+
+    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new 
ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 1, 
resp.getCompactsSize());
+    Assert.assertEquals("Unexpected 0 compaction state", 
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    resultData = new int[][] {{3,4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    runStatementOnDriver("delete from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+    Assert.assertEquals(0, rs.size());
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID");
+  }
+
+  /**
+   * see HIVE-16177
+   * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
+   */
+  @Test
+  public void testNonAcidToAcidConversion02() throws Exception {
+    //create 2 rows in a file 000001_0 (and an empty 000000_0)
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2),(1,3)");
+    //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 
000001_0_copy1
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(0,12),(0,13),(1,4),(1,5)");
+    //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,6)");
+
+    //convert the table to Acid
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET 
TBLPROPERTIES ('transactional'='true')");
+    List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
+    //create a some of delta directories
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(0,15),(1,16)");
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where 
a = 0 and b = 12");
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(0,17)");
+    runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 
and b = 3");
+
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from " +  Table.NONACIDORCTBL + " order by a,b");
+    LOG.warn("before compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    /*
+     * All ROW__IDs are unique on read after conversion to acid 
+     * ROW__IDs are exactly the same before and after compaction
+     * Also check the file name after compaction for completeness
+     */
+    String[][] expected = {
+      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13",  
"bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":0,\"rowid\":0}\t0\t15", 
"bucket_00000"},
+      {"{\"transactionid\":3,\"bucketid\":0,\"rowid\":0}\t0\t17", 
"bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", 
"bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2",   
"bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4",   
"bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5",   
"bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6",   
"bucket_00001"},
+      {"{\"transactionid\":1,\"bucketid\":1,\"rowid\":0}\t1\t16", 
"bucket_00001"}
+    };
+    Assert.assertEquals("Unexpected row count before compaction", 
expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), 
rs.get(i).startsWith(expected[i][0]));
+    }
+    //run Compaction
+    runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL 
+" compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDORCTBL + " order by a,b");
+    LOG.warn("after compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after compaction", 
expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), 
rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), 
rs.get(i).endsWith(expected[i][1]));
+    }
+    //make sure they are the same before and after compaction
+  }
+
   /**
    * Test the query correctness and directory layout after ACID table 
conversion and MAJOR compaction
    * 1. Insert a row to Non-ACID table

Reply via email to