This is an automated email from the ASF dual-hosted git repository. vgumashta pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 313e49f HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman) 313e49f is described below commit 313e49f6b706555a16288fab50c79b7aedf7ba77 Author: Vaibhav Gumashta <vgumas...@apache.org> AuthorDate: Mon Feb 4 17:42:02 2019 -0800 HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 + .../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 54 ++- .../ql/txn/compactor/TestCrudCompactorOnTez.java | 429 +++++++++++++++++++++ .../hadoop/hive/ql/exec/FunctionRegistry.java | 1 + .../hive/ql/exec/tez/HiveSplitGenerator.java | 2 +- .../hadoop/hive/ql/exec/tez/SplitGrouper.java | 164 +++++++- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 8 +- .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 3 - .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 2 +- .../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 38 +- .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 6 + .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 199 +++++++++- .../hadoop/hive/ql/txn/compactor/Initiator.java | 6 + .../generic/GenericUDFValidateAcidSortOrder.java | 100 +++++ .../results/clientpositive/show_functions.q.out | 2 + 15 files changed, 987 insertions(+), 34 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 414070e..a3b03ca 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2705,6 +2705,13 @@ public class HiveConf extends Configuration { HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, "Whether the compactor should compact insert-only tables. A safety switch."), + COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false, + "Means Major compaction on full CRUD tables is done as a query, " + + "and minor compaction will be disabled."), + SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), + "This is set to compactor from within the query based compactor. This enables the Tez SplitGrouper " + + "to group splits based on their bucket number, so that all rows from different bucket files " + + " for the same bucket number can end up in the same bucket file after the compaction."), /** * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED */ diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index d6a4191..142c2d2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -121,10 +121,15 @@ public class TestAcidOnTez { SessionState.start(new SessionState(hiveConf)); d = DriverFactory.newDriver(hiveConf); dropTables(); - runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties()); - runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties()); - runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc "); - runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc "); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc " + getTblProperties()); + runStatementOnDriver("create table " + Table.ACIDTBLPART + + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + + getTblProperties()); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc "); + runStatementOnDriver("create table " + Table.NONACIDPART + + "(a int, b int) partitioned by (p string) stored as orc "); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(5,6)"); @@ -831,6 +836,42 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h // No transactions - just the header row assertEquals(1, rows.size()); } + + /** + * HIVE-20699 + * + * see TestTxnCommands3.testCompactor + */ + @Test + public void testCrudMajorCompactionSplitGrouper() throws Exception { + String tblName = "test_split_grouper"; + // make a clone of existing hive conf + HiveConf confForTez = new HiveConf(hiveConf); + setupTez(confForTez); // one-time setup to make query able to run with Tez + HiveConf.setVar(confForTez, HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + runStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets " + + "stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", confForTez); + runStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", confForTez); + runStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", confForTez); + runStatementOnDriver("delete from " + tblName + " where b = 2"); + List<String> expectedRs = new ArrayList<>(); + expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4"); + expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3"); + expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4"); + expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3"); + expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4"); + expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3"); + expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4"); + expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3"); + List<String> rs = + runStatementOnDriver("select ROW__ID, * from " + tblName + " order by ROW__ID.bucketid, ROW__ID", confForTez); + HiveConf.setVar(confForTez, HiveConf.ConfVars.SPLIT_GROUPING_MODE, "compactor"); + // No order by needed: this should use the compactor split grouping to return the rows in correct order + List<String> rsCompact = runStatementOnDriver("select ROW__ID, * from " + tblName, confForTez); + Assert.assertEquals("normal read", expectedRs, rs); + Assert.assertEquals("compacted read", rs, rsCompact); + } private void restartSessionAndDriver(HiveConf conf) throws Exception { SessionState ss = SessionState.get(); @@ -910,11 +951,16 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h private void setupTez(HiveConf conf) { conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR); + conf.set("tez.am.resource.memory.mb", "128"); + conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled"); conf.setBoolean("tez.local.mode", true); conf.set("fs.defaultFS", "file:///"); conf.setBoolean("tez.runtime.optimize.local.fetch", true); conf.set("tez.staging-dir", TEST_DATA_DIR); conf.setBoolean("tez.ignore.lib.uris", true); + conf.set("hive.tez.container.size", "128"); + conf.setBoolean("hive.merge.tezfiles", false); + conf.setBoolean("hive.in.tez.test", true); } private void setupMapJoin(HiveConf conf) { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java new file mode 100644 index 0000000..d59cfe5 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.orc.OrcConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("deprecation") +// TODO: Add tests for bucketing_version=1 when HIVE-21167 is fixed +public class TestCrudCompactorOnTez { + private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt()); + private static final Logger LOG = LoggerFactory.getLogger(TestCrudCompactorOnTez.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt + .getAndIncrement()).getPath().replaceAll("\\\\", "/"); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + private HiveConf conf; + IMetaStoreClient msClient; + private IDriver driver; + + @Before + // Note: we create a new conf and driver object before every test + public void setup() throws Exception { + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.cleanDb(hiveConf); + TxnDbUtil.prepDb(hiveConf); + conf = hiveConf; + // Use tez as execution engine for this test class + setupTez(conf); + msClient = new HiveMetaStoreClient(conf); + driver = DriverFactory.newDriver(conf); + SessionState.start(new CliSessionState(conf)); + } + + private void setupTez(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR); + conf.set("tez.am.resource.memory.mb", "128"); + conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled"); + conf.setBoolean("tez.local.mode", true); + conf.set("fs.defaultFS", "file:///"); + conf.setBoolean("tez.runtime.optimize.local.fetch", true); + conf.set("tez.staging-dir", TEST_DATA_DIR); + conf.setBoolean("tez.ignore.lib.uris", true); + conf.set("hive.tez.container.size", "128"); + conf.setBoolean("hive.merge.tezfiles", false); + conf.setBoolean("hive.in.tez.test", true); + } + + @After + public void tearDown() { + if (msClient != null) { + msClient.close(); + } + if (driver != null) { + driver.close(); + } + conf = null; + } + + @Test + public void testMajorCompaction() throws Exception { + String dbName = "default"; + String tblName = "testMajorCompaction"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present + FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[filestatus.length]; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = filestatus[i].getPath().getName(); + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" }; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + // Verify that delete delta (delete_delta_0000003_0000003_0000) is present + FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), + AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" }; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + List<String> expectedRsBucket0 = new ArrayList<>(); + expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4"); + expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3"); + expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4"); + expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3"); + List<String> expectedRsBucket1 = new ArrayList<>(); + expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4"); + expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3"); + expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4"); + expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3"); + // Bucket 0 + List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver); + Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0); + // Bucket 1 + List<String> rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver); + Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1); + // Run major compaction and cleaner + runCompaction(dbName, tblName, CompactionType.MAJOR); + runCleaner(conf); + // Should contain only one base directory now + filestatus = fs.listStatus(new Path(table.getSd().getLocation())); + String[] bases = new String[filestatus.length]; + for (int i = 0; i < bases.length; i++) { + bases[i] = filestatus[i].getPath().getName(); + } + Arrays.sort(bases); + String[] expectedBases = new String[] { "base_0000003_v0000008" }; + if (!Arrays.deepEquals(expectedBases, bases)) { + Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases)); + } + // Bucket 0 + List<String> rsCompactBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536870912", driver); + Assert.assertEquals("compacted read", rsBucket0, rsCompactBucket0); + // Bucket 1 + List<String> rsCompactBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536936448", driver); + Assert.assertEquals("compacted read", rsBucket1, rsCompactBucket1); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + @Test + public void testMinorCompactionDisabled() throws Exception { + String dbName = "default"; + String tblName = "testMinorCompactionDisabled"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present + FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[filestatus.length]; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = filestatus[i].getPath().getName(); + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" }; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + // Verify that delete delta (delete_delta_0000003_0000003_0000) is present + FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), + AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" }; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // Initiate a compaction, make sure it's not queued + runInitiator(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + @Test + public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception { + String dbName = "default"; + String tblName = "testCompactionWithSchemaEvolutionAndBuckets"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + // Insert some data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')", + driver); + // Add a new column + executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver); + // Insert more data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today')," + + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + // Run major compaction and cleaner + runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today"); + runCleaner(conf); + List<String> expectedRsBucket0PtnToday = new ArrayList<>(); + expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday"); + expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday"); + expectedRsBucket0PtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday"); + List<String> expectedRsBucket1PtnToday = new ArrayList<>(); + expectedRsBucket1PtnToday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3\tNULL\ttoday"); + expectedRsBucket1PtnToday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t4\t1005\ttoday"); + // Bucket 0, partition 'today' + List<String> rsCompactBucket0PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + + tblName + " where ROW__ID.bucketid = 536870912 and ds='today'", driver); + Assert.assertEquals("compacted read", expectedRsBucket0PtnToday, rsCompactBucket0PtnToday); + // Bucket 1, partition 'today' + List<String> rsCompactBucket1PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + + tblName + " where ROW__ID.bucketid = 536936448 and ds='today'", driver); + Assert.assertEquals("compacted read", expectedRsBucket1PtnToday, rsCompactBucket1PtnToday); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + @Test + public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws Exception { + HiveConf hiveConf = new HiveConf(conf); + hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); + hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2); + driver = DriverFactory.newDriver(hiveConf); + String dbName = "default"; + String tblName = "testCompactionWithSchemaEvolutionNoBucketsMultipleReducers"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) partitioned by(ds string)" + + " stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')", driver); + // Insert some data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')", + driver); + // Add a new column + executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver); + // Insert more data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today')," + + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + // Run major compaction and cleaner + runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today"); + runCleaner(hiveConf); + List<String> expectedRsPtnToday = new ArrayList<>(); + expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t3\tNULL\ttoday"); + expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\tNULL\ttoday"); + expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\tNULL\ttoday"); + expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday"); + expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":2}\t4\t4\t1005\ttoday"); + List<String> expectedRsPtnYesterday = new ArrayList<>(); + expectedRsPtnYesterday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4\tNULL\tyesterday"); + expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t3\t4\t1002\tyesterday"); + expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":2}\t4\t3\t1004\tyesterday"); + // Partition 'today' + List<String> rsCompactPtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ds='today'", driver); + Assert.assertEquals("compacted read", expectedRsPtnToday, rsCompactPtnToday); + // Partition 'yesterday' + List<String> rsCompactPtnYesterday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ds='yesterday'", driver); + Assert.assertEquals("compacted read", expectedRsPtnYesterday, rsCompactPtnYesterday); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) + throws Exception { + HiveConf hiveConf = new HiveConf(conf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + if (partNames.length == 0) { + txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); + t.run(); + } else { + for (String partName : partNames) { + CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType); + cr.setPartitionname(partName); + txnHandler.compact(cr); + t.run(); + } + } + } + + static void runInitiator(HiveConf hConf) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + AtomicBoolean stop = new AtomicBoolean(true); + Initiator t = new Initiator(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + static void runWorker(HiveConf hConf) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + AtomicBoolean stop = new AtomicBoolean(true); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + static void runCleaner(HiveConf hConf) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + AtomicBoolean stop = new AtomicBoolean(true); + Cleaner t = new Cleaner(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + /** + * Execute Hive CLI statement + * + * @param cmd arbitrary statement to execute + */ + static void executeStatementOnDriver(String cmd, IDriver driver) throws Exception { + LOG.debug("Executing: " + cmd); + CommandProcessorResponse cpr = driver.run(cmd); + if (cpr.getResponseCode() != 0) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr); + } + } + + static List<String> executeStatementOnDriverAndReturnResults(String cmd, IDriver driver) throws Exception { + LOG.debug("Executing: " + cmd); + CommandProcessorResponse cpr = driver.run(cmd); + if (cpr.getResponseCode() != 0) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr); + } + List<String> rs = new ArrayList<String>(); + driver.getResults(rs); + return rs; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index e7aa041..9ff0107 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -288,6 +288,7 @@ public final class FunctionRegistry { system.registerGenericUDF("split", GenericUDFSplit.class); system.registerGenericUDF("str_to_map", GenericUDFStringToMap.class); system.registerGenericUDF("translate", GenericUDFTranslate.class); + system.registerGenericUDF("validate_acid_sort_order", GenericUDFValidateAcidSortOrder.class); system.registerGenericUDF(UNARY_PLUS_FUNC_NAME, GenericUDFOPPositive.class); system.registerGenericUDF(UNARY_MINUS_FUNC_NAME, GenericUDFOPNegative.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 15c14c9..c270507 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -128,7 +128,7 @@ public class HiveSplitGenerator extends InputInitializer { MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload()); this.conf = TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes()); - + this.jobConf = new JobConf(conf); // Read all credentials into the credentials instance stored in JobConf. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 7f8bd22..33d723a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -33,10 +34,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -160,43 +166,161 @@ public class SplitGrouper { return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider); } - /** Generate groups of splits, separated by schema evolution boundaries */ - public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, - Configuration conf, - InputSplit[] splits, - float waves, int availableSlots, - String inputName, - boolean groupAcrossFiles, - SplitLocationProvider locationProvider) throws - Exception { - - MapWork work = populateMapWork(jobConf, inputName); + /** + * Generate groups of splits, separated by schema evolution boundaries + * OR + * When used from compactor, group splits based on the bucket number of the input files + * (in this case, splits for same logical bucket but different schema, end up in same group) + */ + public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, + float waves, int availableSlots, String inputName, boolean groupAcrossFiles, + SplitLocationProvider locationProvider) throws Exception { + MapWork mapWork = populateMapWork(jobConf, inputName); // ArrayListMultimap is important here to retain the ordering for the splits. - Multimap<Integer, InputSplit> bucketSplitMultiMap = - ArrayListMultimap.<Integer, InputSplit> create(); + Multimap<Integer, InputSplit> schemaGroupedSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); + + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) { + List<Path> paths = Utilities.getInputPathsTez(jobConf, mapWork); + for (Path path : paths) { + List<String> aliases = mapWork.getPathToAliases().get(path); + if ((aliases != null) && (aliases.size() == 1)) { + Operator<? extends OperatorDesc> op = mapWork.getAliasToWork().get(aliases.get(0)); + if ((op != null) && (op instanceof TableScanOperator)) { + TableScanOperator tableScan = (TableScanOperator) op; + if (!tableScan.getConf().isTranscationalTable()) { + String splitPath = getFirstSplitPath(splits); + String errorMessage = + "Compactor split grouping is enabled only for transactional tables. Please check the path: " + + splitPath; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } + } + } + /** + * The expectation is that each InputSplit is a {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit} + * wrapping an OrcSplit. So group these splits by bucketId and within each bucketId, sort by writeId, stmtId, + * rowIdOffset or splitStart. For 'original' splits (w/o acid meta cols in the file) SyntheticBucketProperties + * should always be there and so rowIdOffset is there. For 'native' acid files, OrcSplit doesn't have + * the 1st rowid in the split, so splitStart is used to sort. This should achieve the required sorting invariance + * (sort by: writeId, stmtId, rowIdOffset within each bucket) needed for Acid tables. + * See: {@link org.apache.hadoop.hive.ql.io.AcidInputFormat} + * Create a TezGroupedSplit for each bucketId and return. + * TODO: Are there any other config values (split size etc) that can override this per writer split grouping? + */ + return getCompactorSplitGroups(splits, conf); + } int i = 0; InputSplit prevSplit = null; for (InputSplit s : splits) { - // this is the bit where we make sure we don't group across partition - // schema boundaries - if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) { + // this is the bit where we make sure we don't group across partition schema boundaries + if (schemaEvolved(s, prevSplit, groupAcrossFiles, mapWork)) { ++i; prevSplit = s; } - bucketSplitMultiMap.put(i, s); + schemaGroupedSplitMultiMap.put(i, s); } LOG.info("# Src groups for split generation: " + (i + 1)); - // group them into the chunks we want Multimap<Integer, InputSplit> groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider); - + this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } + + // Returns the path of the first split in this list for logging purposes + private String getFirstSplitPath(InputSplit[] splits) { + if (splits.length == 0) { + throw new RuntimeException("The list of splits provided for grouping is empty."); + } + Path splitPath = ((FileSplit) splits[0]).getPath(); + + return splitPath.toString(); + } /** + * Takes a list of {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}s + * and groups them for Acid Compactor, creating one TezGroupedSplit per bucket number. + */ + Multimap<Integer, InputSplit> getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf) { + // Note: For our case, this multimap will essentially contain one value (one TezGroupedSplit) per key + Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); + HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.length]; + int i = 0; + for (InputSplit is : rawSplits) { + splits[i++] = (HiveInputFormat.HiveInputSplit) is; + } + Arrays.sort(splits, new ComparatorCompactor(conf)); + TezGroupedSplit tgs = null; + int previousWriterId = Integer.MIN_VALUE; + Path rootDir = null; + for (i = 0; i < splits.length; i++) { + int writerId = ((OrcSplit) splits[i].getInputSplit()).getBucketId(); + if (rootDir == null) { + rootDir = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); + } + Path rootDirFromCurrentSplit = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); + // These splits should belong to the same partition + assert rootDir == rootDirFromCurrentSplit; + if (writerId != previousWriterId) { + // Create a new grouped split for this writerId + tgs = new TezGroupedSplit(1, "org.apache.hadoop.hive.ql.io.HiveInputFormat", null, null); + bucketSplitMultiMap.put(writerId, tgs); + } + tgs.addSplit(splits[i]); + previousWriterId = writerId; + } + return bucketSplitMultiMap; + } + + static class ComparatorCompactor implements Comparator<HiveInputFormat.HiveInputSplit> { + private Configuration conf; + private ComparatorCompactor(Configuration conf) { + this.conf = conf; + } + + @Override + public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputSplit h2) { + //sort: bucketId,writeId,stmtId,rowIdOffset,splitStart + if(h1 == h2) { + return 0; + } + OrcSplit o1 = (OrcSplit)h1.getInputSplit(); + OrcSplit o2 = (OrcSplit)h2.getInputSplit(); + try { + o1.parse(conf); + o2.parse(conf); + } catch(IOException ex) { + throw new RuntimeException(ex); + } + // Note: this is the bucket number as seen in the file name. + // Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute. + // See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details. + if(o1.getBucketId() != o2.getBucketId()) { + return o1.getBucketId() < o2.getBucketId() ? -1 : 1; + } + if(o1.getWriteId() != o2.getWriteId()) { + return o1.getWriteId() < o2.getWriteId() ? -1 : 1; + } + if(o1.getStatementId() != o2.getStatementId()) { + return o1.getStatementId() < o2.getStatementId() ? -1 : 1; + } + long rowOffset1 = o1.getSyntheticAcidProps() == null ? 0 : o1.getSyntheticAcidProps().getRowIdOffset(); + long rowOffset2 = o2.getSyntheticAcidProps() == null ? 0 : o2.getSyntheticAcidProps().getRowIdOffset(); + if(rowOffset1 != rowOffset2) { + //if 2 splits are from the same file (delta/base in fact), they either both have syntheticAcidProps or both do not + return rowOffset1 < rowOffset2 ? -1 : 1; + } + if(o1.getStart() != o2.getStart()) { + return o1.getStart() < o2.getStart() ? -1 : 1; + } + throw new RuntimeException("Found 2 equal splits: " + o1 + " and " + o2); + } + } + + /** * get the size estimates for each bucket in tasks. This is used to make sure * we allocate the head room evenly */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 5dbf634..9b51847 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1553,9 +1553,13 @@ public class AcidUtils { } public static boolean isFullAcidScan(Configuration conf) { - if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) return false; + if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) { + return false; + } int propInt = conf.getInt(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1); - if (propInt == -1) return true; + if (propInt == -1) { + return true; + } AcidOperationalProperties props = AcidOperationalProperties.parseInt(propInt); return !props.isInsertOnly(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index fbb931c..7c4bc4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1230,9 +1230,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ else { AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, parent.getFileSystem(conf)); - assert pd.getMinWriteId() == pd.getMaxWriteId() : - "This a delta with raw non acid schema, must be result of single write, no compaction: " - + splitPath; return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 6d4578e..2255f8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -634,7 +634,7 @@ public class OrcRecordUpdater implements RecordUpdater { } catch (CharacterCodingException e) { throw new IllegalArgumentException("Bad string encoding for " + OrcRecordUpdater.ACID_KEY_INDEX_NAME, e); - } + } RecordIdentifier[] result = new RecordIdentifier[stripes.length]; for(int i=0; i < stripes.length; ++i) { if (stripes[i].length() != 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 4d55592..61e7558 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; @@ -64,6 +65,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit private long projColsUncompressedSize; private transient Object fileKey; private long fileLen; + private transient long writeId = 0; + private transient int bucketId = 0; + private transient int stmtId = 0; /** * This contains the synthetic ROW__ID offset and bucket properties for original file splits in an ACID table. @@ -306,7 +310,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit /** * Used for generating synthetic ROW__IDs for reading "original" files. */ - static final class OffsetAndBucketProperty { + public static final class OffsetAndBucketProperty { private final long rowIdOffset; private final int bucketProperty; private final long syntheticWriteId; @@ -328,6 +332,38 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit return syntheticWriteId; } } + + /** + * Note: this is the write id as seen in the file name that contains this split + * For files that have min/max writeId, this is the starting one. + * @return + */ + public long getWriteId() { + return writeId; + } + + public int getStatementId() { + return stmtId; + } + + /** + * Note: this is the bucket number as seen in the file name that contains this split. + * Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute. + * See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details. + * @return + */ + public int getBucketId() { + return bucketId; + } + + public void parse(Configuration conf) throws IOException { + OrcRawRecordMerger.TransactionMetaData tmd = + OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootDir, conf); + writeId = tmd.syntheticWriteId; + stmtId = tmd.statementId; + AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf); + bucketId = opt.getBucketId(); + } @Override public String toString() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index db3b427..adfa431 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -2197,6 +2197,12 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase(); + if (type.equalsIgnoreCase("minor") && HiveConf.getBoolVar(conf, ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { + throw new SemanticException( + "Minor compaction is not currently supported for query based compaction (enabled by setting: " + + ConfVars.COMPACTOR_CRUD_QUERY_BASED + " to true)."); + } + if (!type.equals("minor") && !type.equals("major")) { throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index dc05e19..cde47da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -69,6 +71,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.session.SessionState; @@ -232,6 +235,23 @@ public class CompactorMR { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } + + /** + * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method). + * TODO: + * 1. A good way to run minor compaction (currently disabled when this config is enabled) + * 2. More generic approach to collecting files in the same logical bucket to compact within the same task + * (currently we're using Tez split grouping). + */ + if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf, + ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { + if (ci.isMajorCompaction()) { + runCrudCompaction(conf, t, p, sd, writeIds, ci); + return; + } else { + throw new RuntimeException("Query based compaction is not currently supported for minor compactions"); + } + } if (AcidUtils.isInsertOnlyTable(t.getParameters())) { if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { @@ -318,6 +338,85 @@ public class CompactorMR { su.gatherStats(); } + /** + * + * @param conf + * @param t + * @param p + * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition) + * @param writeIds (valid write ids used to filter rows while they're being read for compaction) + * @param ci + * @throws IOException + */ + private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, + CompactionInfo ci) throws IOException { + AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false, + t.getParameters()); + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { + LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir + .getBaseDirectory(), deltaCount, origCount); + return; + } + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, false); + // Set up the session for driver. + HiveConf conf = new HiveConf(hiveConf); + conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + /** + * For now, we will group splits on tez so that we end up with all bucket files, + * with same bucket number in one map task. + */ + conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + long compactorTxnId = CompactorMap.getCompactorTxnId(conf); + try { + // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 + String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd); + LOG.info("Running major compaction query into temp table with create definition: {}", query); + try { + DriverUtils.runOnDriver(conf, user, sessionState, query); + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName); + LOG.info("Running major compaction via query: {}", query); + /** + * This will create bucket files like: + * db/db_tmp_compactor_tbl_1234/00000_0 + * db/db_tmp_compactor_tbl_1234/00001_0 + */ + DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); + /** + * This achieves a final layout like (wid is the highest valid write id for this major compaction): + * db/tbl/ptn/base_wid/bucket_00000 + * db/tbl/ptn/base_wid/bucket_00001 + */ + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + String tmpLocation = tempTable.getSd().getLocation(); + commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId); + } catch (HiveException e) { + LOG.error("Error doing query based major compaction", e); + throw new IOException(e); + } finally { + try { + DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); + } catch (HiveException e) { + LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); + LOG.error(ExceptionUtils.getStackTrace(e)); + } + } + } + private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " @@ -376,8 +475,7 @@ public class CompactorMR { } } } - - String query = buildMmCompactionQuery(driverConf, t, p, tmpTableName); + String query = buildMmCompactionQuery(conf, t, p, tmpTableName); LOG.info("Compacting a MM table via " + query); long compactorTxnId = CompactorMap.getCompactorTxnId(conf); DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); @@ -393,6 +491,103 @@ public class CompactorMR { private String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } + + /** + * Note on ordering of rows in the temp table: + * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). + * (current write id will be the same as original write id). + * We will be achieving the ordering via a custom split grouper for compactor. + * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description. + * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism. + */ + private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) { + StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); + // Acid virtual columns + query.append( + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<"); + List<FieldSchema> cols = t.getSd().getCols(); + boolean isFirst = true; + // Actual columns + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); + } + query.append(">)"); + query.append(" stored as orc"); + query.append(" tblproperties ('transactional'='false')"); + return query.toString(); + } + + private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + String query = "insert into table " + tmpName + " "; + String filter = ""; + if (p != null) { + filter = filter + " where "; + List<String> vals = p.getValues(); + List<FieldSchema> keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); + } + } + query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, " + + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT("; + List<FieldSchema> cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName(); + } + query += ") from " + fullName + filter; + return query; + } + + /** + * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. + * Since the temp table is a non-transactional table, it has file names in the "original" format. + * Also, due to split grouping in {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}, + * we will end up with one file per bucket. + */ + private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException { + Path fromPath = new Path(from); + Path toPath = new Path(to); + Path tmpTablePath = new Path(fromPath, tmpTableName); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + long maxTxn = actualWriteIds.getHighWatermark(); + // Get a base_wid path which will be the new compacted base + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false) + .maximumWriteId(maxTxn).bucket(0).statementId(-1); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); + fs.mkdirs(newBaseDir); + return; + } + LOG.info("Moving contents of {} to {}", tmpTablePath, to); + /** + * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on + * TODO/ToThink: + * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination? + */ + // List<String> buckCols = t.getSd().getBucketCols(); + FileStatus[] children = fs.listStatus(fromPath); + for (FileStatus filestatus : children) { + String originalFileName = filestatus.getPath().getName(); + // This if() may not be required I think... + if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { + int bucketId = AcidUtils.parseBucketId(filestatus.getPath()); + options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) + .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId); + Path finalBucketFile = AcidUtils.createFilename(toPath, options); + fs.rename(filestatus.getPath(), finalBucketFile); + } + } + fs.delete(fromPath, true); + } private String buildMmCompactionCtQuery( String fullName, Table t, StorageDescriptor sd, String location) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a0df82c..a37c983 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -96,6 +96,12 @@ public class Initiator extends MetaStoreCompactorThread { LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { + // Disable minor compaction for query based compactor + if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { + LOG.debug("Not compacting: " + ci.getFullPartitionName() + + ", as query based compaction currently does not " + "support minor compactions."); + continue; + } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { Table t = resolveTable(ci); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java new file mode 100644 index 0000000..757a366 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java @@ -0,0 +1,100 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; + +/** + * GenericUDFValidateAcidSortOrder. + */ +@Description(name = "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId)", + value = "_FUNC_(writeId, bucketId, rowId) - returns 0 if the current row is in the right acid sort order " + + "compared to the previous row") +public class GenericUDFValidateAcidSortOrder extends GenericUDF { + public static final String UDF_NAME = "validate_acid_sort_order"; + private transient PrimitiveCategory[] inputTypes = new PrimitiveCategory[3]; + private transient Converter[] converters = new Converter[3]; + private final LongWritable output = new LongWritable(); + // See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} + // Each writer is handling only one logical bucket (i.e. all files with same bucket number end up in one writer) + private int bucketNumForWriter = -1; + private WriteIdRowId previousWriteIdRowId = null; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + checkArgsSize(arguments, 3, 3); + checkArgPrimitive(arguments, 0); + checkArgPrimitive(arguments, 1); + checkArgPrimitive(arguments, 2); + obtainLongConverter(arguments, 0, inputTypes, converters); + obtainIntConverter(arguments, 1, inputTypes, converters); + obtainLongConverter(arguments, 2, inputTypes, converters); + ObjectInspector outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector; + return outputOI; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + long writeId = getLongValue(arguments, 0, converters); + int bucketProperty = getIntValue(arguments, 1, converters); + int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + long rowId = getLongValue(arguments, 2, converters); + if (bucketNumForWriter < 0) { + bucketNumForWriter = bucketNum; + } else { + if (bucketNumForWriter != bucketNum) { + throw new HiveException("One writer is supposed to handle only one bucket. We saw these 2 different buckets: " + + bucketNumForWriter + " and " + bucketNum); + } + } + WriteIdRowId current = new WriteIdRowId(bucketProperty, writeId, rowId); + if (previousWriteIdRowId != null) { + // Verify sort order for this new row + if (current.compareTo(previousWriteIdRowId) <= 0) { + throw new HiveException("Wrong sort order of Acid rows detected for the rows: " + previousWriteIdRowId + " and " + + current); + } + } + previousWriteIdRowId = current; + output.set(0); + return output; + } + + @Override + public String getDisplayString(String[] children) { + return getStandardDisplayString("validate_acid_sort_order", children); + } + + final static class WriteIdRowId implements Comparable<WriteIdRowId> { + final int bucketProperty; + final long writeId; + final long rowId; + + WriteIdRowId(int bucketProperty, long writeId, long rowId) { + this.bucketProperty = bucketProperty; + this.writeId = writeId; + this.rowId = rowId; + } + + @Override + public int compareTo(WriteIdRowId other) { + if (this.bucketProperty != other.bucketProperty) { + return this.bucketProperty < other.bucketProperty ? -1 : 1; + } + if (this.writeId != other.writeId) { + return this.writeId < other.writeId ? -1 : 1; + } + if (this.rowId != other.rowId) { + return this.rowId < other.rowId ? -1 : 1; + } + return 0; + } + } +} \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/show_functions.q.out b/ql/src/test/results/clientpositive/show_functions.q.out index c9716e9..1d2cb1c 100644 --- a/ql/src/test/results/clientpositive/show_functions.q.out +++ b/ql/src/test/results/clientpositive/show_functions.q.out @@ -279,6 +279,7 @@ unhex unix_timestamp upper uuid +validate_acid_sort_order var_pop var_samp variance @@ -401,6 +402,7 @@ date_format date_sub datediff to_date +validate_acid_sort_order PREHOOK: query: SHOW FUNCTIONS '***' PREHOOK: type: SHOWFUNCTIONS POSTHOOK: query: SHOW FUNCTIONS '***'