http://git-wip-us.apache.org/repos/asf/hive/blob/28b24dbf/upgrade-acid/pre-upgrade/pom.xml ---------------------------------------------------------------------- diff --git a/upgrade-acid/pre-upgrade/pom.xml b/upgrade-acid/pre-upgrade/pom.xml new file mode 100644 index 0000000..da73d31 --- /dev/null +++ b/upgrade-acid/pre-upgrade/pom.xml @@ -0,0 +1,284 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive-upgrade-acid</artifactId> + <version>4.0.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + + <modelVersion>4.0.0</modelVersion> + <!--this module is added to parent pom so that it builds and releases with the reset of Hive--> + <artifactId>hive-pre-upgrade</artifactId> + <name>Hive Pre Upgrade Acid</name> + <packaging>jar</packaging> + + <properties> + <hive.path.to.root>../..</hive.path.to.root> + + <!-- Test Properties --> + <test.conf.dir>${project.build.directory}/testconf</test.conf.dir> + <test.log4j.scheme>file://</test.log4j.scheme> + <log4j.conf.dir>${project.basedir}/src/test/resources</log4j.conf.dir> + <test.tmp.dir>${project.build.directory}/tmp</test.tmp.dir> + <test.warehouse.dir>${project.build.directory}/warehouse</test.warehouse.dir> + <test.warehouse.scheme>file://</test.warehouse.scheme> + <test.forkcount>1</test.forkcount> + <skipITests>true</skipITests> + <hdp.hive.version>2.3.3</hdp.hive.version> + <hdp.hadoop.version>2.7.2</hdp.hadoop.version> + </properties> + <dependencies> + <!--scope is 'provided' for all. The UpgradeTool is provided as part of Hive 3.x and + supports 2 modes - preUpgrade which runs with 2.x jars on the classpath and postUpgrade + which runs with 3.x jars. 'provided' should pull these jars for compile/test but not + for packaging.--> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hdp.hive.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hdp.hive.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hdp.hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <!-- w/o this we get this, even though mapreduce.framework.name=mapred.job.tracker=local + https://stackoverflow.com/questions/24096834/org-apache-hadoop-mapred-localclientprotocolprovider-not-found + + 2018-05-23T13:01:50,122 ERROR [main] exec.Task: Job Submission failed with exception 'java.io.IOException(Cannot initialize Cluster. Please check yo\ +ur configuration for mapreduce.framework.name and the correspond server addresses.)' +java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. + at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120) + at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82) + at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75) + at org.apache.hadoop.mapred.JobClient.init(JobClient.java:470) + at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:449) + at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:369) + at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:151) + at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199) + at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100) + at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183) + at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839) + at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526) + at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237) + + --> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>2.7.2</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>1.3.3</version> + <scope>provided</scope> + </dependency> + </dependencies> + + + <build> + <resources> + <resource> + <directory>${basedir}/src/main/resources</directory> + <includes> + <include>package.jdo</include> + </includes> + </resource> + </resources> + + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>${maven.antrun.plugin.version}</version> + <dependencies> + <dependency> + <groupId>ant-contrib</groupId> + <artifactId>ant-contrib</artifactId> + <version>${ant.contrib.version}</version> + <exclusions> + <exclusion> + <groupId>ant</groupId> + <artifactId>ant</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${maven.checkstyle.plugin.version}</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven.exec.plugin.version}</version> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <!-- plugins are always listed in sorted order by groupId, artifectId --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>setup-test-dirs</id> + <phase>process-test-resources</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <delete dir="${test.conf.dir}" /> + <delete dir="${test.tmp.dir}" /> + <delete dir="${test.warehouse.dir}" /> + <mkdir dir="${test.tmp.dir}" /> + <mkdir dir="${test.warehouse.dir}" /> + <mkdir dir="${test.conf.dir}" /> + <!-- copies hive-site.xml so it can be modified --> + <copy todir="${test.conf.dir}"> + <fileset dir="${basedir}/${hive.path.to.root}/data/conf/"/> + </copy> + </target> + </configuration> + </execution> + <execution> + <id>setup-metastore-scripts</id> + <phase>process-test-resources</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <mkdir dir="${test.tmp.dir}/scripts/metastore" /> + <copy todir="${test.tmp.dir}/scripts/metastore"> + <fileset dir="${basedir}/${hive.path.to.root}/metastore/scripts/"/> + </copy> + <mkdir dir="${test.tmp.dir}/scripts/metastore/upgrade" /> + <copy todir="${test.tmp.dir}/scripts/metastore/upgrade"> + <fileset dir="${basedir}/${hive.path.to.root}/standalone-metastore/metastore-server/src/main/sql/"/> + </copy> + </target> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.20.1</version> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <reuseForks>false</reuseForks> + <argLine>-Xmx2048m</argLine> + <failIfNoTests>false</failIfNoTests> + <systemPropertyVariables> + <log4j.debug>true</log4j.debug> + <java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir> + <test.tmp.dir>${test.tmp.dir}</test.tmp.dir> + <hive.in.test>true</hive.in.test> + </systemPropertyVariables> + <additionalClasspathElements> + <additionalClasspathElement>${log4j.conf.dir}</additionalClasspathElement> + </additionalClasspathElements> + <skipITs>${skipITests}</skipITs> <!-- set this to false to run these tests --> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven.surefire.version}</version> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <reuseForks>false</reuseForks> + <forkCount>${test.forkcount}</forkCount> + <argLine>-Xmx2048m</argLine> + <failIfNoTests>false</failIfNoTests> + <systemPropertyVariables> + <build.dir>${project.build.directory}</build.dir> + <datanucleus.schema.autoCreateAll>true</datanucleus.schema.autoCreateAll> + <derby.version>${derby.version}</derby.version> + <derby.stream.error.file>${test.tmp.dir}/derby.log</derby.stream.error.file> + <!--next line needed to get hive.log--> + <log4j.configurationFile>${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties</log4j.configurationFile> + <log4j.debug>true</log4j.debug> + <java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir> + <!-- + use 'memory' to make it run faster + <javax.jdo.option.ConnectionURL>jdbc:derby:memory:${test.tmp.dir}/junit_metastore_db;create=true</javax.jdo.option.ConnectionURL>--> + <javax.jdo.option.ConnectionURL>jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true</javax.jdo.option.ConnectionURL> + <metastore.schema.verification>false</metastore.schema.verification> + <test.tmp.dir>${test.tmp.dir}</test.tmp.dir> + <metastore.warehouse.dir>${test.warehouse.scheme}${test.warehouse.dir}</metastore.warehouse.dir> + <!-- both default to 'local' + <mapred.job.tracker>local</mapred.job.tracker> + <mapreduce.framework.name>local</mapreduce.framework.name>--> + </systemPropertyVariables> + <additionalClasspathElements> + <additionalClasspathElement>${log4j.conf.dir}</additionalClasspathElement> + <additionalClasspathElement>${test.conf.dir}</additionalClasspathElement> + <!--puts hive-site.xml on classpath - w/o HMS tables are not created--> + <additionalClasspathElement>${test.conf.dir}/conf</additionalClasspathElement> + </additionalClasspathElements> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/28b24dbf/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java ---------------------------------------------------------------------- diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java new file mode 100644 index 0000000..2547f25 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.HiveVersionInfo; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString; + +/** + * This utility is designed to help with upgrading Hive 2.x to Hive 3.0. On-disk layout for + * transactional tables has changed in 3.0 and require pre-processing before upgrade to ensure + * they are readable by Hive 3.0. Some transactional tables (identified by this utility) require + * Major compaction to be run on them before upgrading to 3.0. Once this compaction starts, no + * more update/delete/merge statements may be executed on these tables until upgrade is finished. + * + * Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These + * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may + * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands. + * + * Note that depending on the number of tables/partitions and amount of data in them compactions + * may take a significant amount of time and resources. The script output by this utility includes + * some heuristics that may help estimate the time required. If no script is produced, no action + * is needed. For compactions to run an instance of standalone Hive Metastore must be running. + * Please make sure hive.compactor.worker.threads is sufficiently high - this specifies the limit + * of concurrent compactions that may be run. Each compaction job is a Map-Reduce job. + * hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be + * submitted. + * + * "execute" option may be supplied to have the utility automatically execute the + * equivalent of the generated commands + * + * "location" option may be supplied followed by a path to set the location for the generated + * scripts. + * + * Random: + * This utility connects to the Metastore via API. It may be necessary to set + * -Djavax.security.auth.useSubjectCredsOnly=false in Kerberized environment if errors like + * "org.ietf.jgss.GSSException: No valid credentials provided ( + * Mechanism level: Failed to find any Kerberos tgt)" + * show up after kinit. + * + * See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x + */ +public class PreUpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class); + private static final int PARTITION_BATCH_SIZE = 10000; + private final Options cmdLineOptions = new Options(); + + public static void main(String[] args) throws Exception { + PreUpgradeTool tool = new PreUpgradeTool(); + tool.init(); + CommandLineParser parser = new GnuParser(); + CommandLine line ; + String outputDir = "."; + boolean execute = false; + try { + line = parser.parse(tool.cmdLineOptions, args); + } catch (ParseException e) { + System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); + printAndExit(tool); + return; + } + if (line.hasOption("help")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + return; + } + if(line.hasOption("location")) { + outputDir = line.getOptionValue("location"); + } + if(line.hasOption("execute")) { + execute = true; + } + LOG.info("Starting with execute=" + execute + ", location=" + outputDir); + + try { + String hiveVer = HiveVersionInfo.getShortVersion(); + LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " + + HiveVersionInfo.getBuildVersion()); + if(!hiveVer.startsWith("2.")) { + throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); + } + tool.prepareAcidUpgradeInternal(outputDir, execute); + } + catch(Exception ex) { + LOG.error("PreUpgradeTool failed", ex); + throw ex; + } + } + private static void printAndExit(PreUpgradeTool tool) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + System.exit(1); + } + + private void init() { + try { + cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" + + " cluster. This requires 2.x binaries on the classpath and hive-site.xml.")); + Option exec = new Option("execute", + "Executes commands equivalent to generated scrips"); + exec.setOptionalArg(true); + cmdLineOptions.addOption(exec); + cmdLineOptions.addOption(new Option("location", true, + "Location to write scripts to. Default is CWD.")); + } + catch(Exception ex) { + LOG.error("init()", ex); + throw ex; + } + } + private static HiveMetaHookLoader getHookLoader() { + return new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook( + org.apache.hadoop.hive.metastore.api.Table tbl) { + return null; + } + }; + } + + private static IMetaStoreClient getHMS(HiveConf conf) { + UserGroupInformation loggedInUser = null; + try { + loggedInUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage()); + } + boolean secureMode = loggedInUser != null && loggedInUser.hasKerberosCredentials(); + if (secureMode) { + conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); + } + try { + LOG.info("Creating metastore client for {}", "PreUpgradeTool"); + /* I'd rather call return RetryingMetaStoreClient.getProxy(conf, true) + which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in + (at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release) + i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/ + return RetryingMetaStoreClient.getProxy(conf, + new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); + } catch (MetaException e) { + throw new RuntimeException("Error connecting to Hive Metastore URI: " + + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); + } + } + + /** + * todo: change script comments to a preamble instead of a footer + */ + private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute) + throws HiveException, TException, IOException { + HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); + boolean isAcidEnabled = isAcidEnabled(conf); + IMetaStoreClient hms = getHMS(conf); + LOG.debug("Looking for databases"); + List<String> databases = hms.getAllDatabases();//TException + LOG.debug("Found " + databases.size() + " databases to process"); + List<String> compactions = new ArrayList<>(); + final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); + ValidTxnList txns = null; + Hive db = null; + if(execute) { + db = Hive.get(conf); + } + + for(String dbName : databases) { + List<String> tables = hms.getAllTables(dbName); + LOG.debug("found " + tables.size() + " tables in " + dbName); + for(String tableName : tables) { + Table t = hms.getTable(dbName, tableName); + LOG.debug("processing table " + Warehouse.getQualifiedName(t)); + if(isAcidEnabled) { + //if acid is off, there can't be any acid tables - nothing to compact + if(txns == null) { + /* + This API changed from 2.x to 3.0. so this won't even compile with 3.0 + but it doesn't need to since we only run this preUpgrade + */ + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + } + List<String> compactionCommands = + getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns); + compactions.addAll(compactionCommands); + } + /*todo: handle renaming files somewhere*/ + } + } + makeCompactionScript(compactions, scriptLocation, compactionMetaInfo); + + if(execute) { + while(compactionMetaInfo.compactionIds.size() > 0) { + LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() + + " compactions to complete"); + ShowCompactResponse resp = db.showCompactions(); + for(ShowCompactResponseElement e : resp.getCompacts()) { + final String state = e.getState(); + boolean removed; + switch (state) { + case TxnStore.CLEANING_RESPONSE: + case TxnStore.SUCCEEDED_RESPONSE: + removed = compactionMetaInfo.compactionIds.remove(e.getId()); + if(removed) { + LOG.debug("Required compaction succeeded: " + e.toString()); + } + break; + case TxnStore.ATTEMPTED_RESPONSE: + case TxnStore.FAILED_RESPONSE: + removed = compactionMetaInfo.compactionIds.remove(e.getId()); + if(removed) { + LOG.warn("Required compaction failed: " + e.toString()); + } + break; + case TxnStore.INITIATED_RESPONSE: + //may flood the log + //LOG.debug("Still waiting on: " + e.toString()); + break; + case TxnStore.WORKING_RESPONSE: + LOG.debug("Still working on: " + e.toString()); + break; + default://shouldn't be any others + LOG.error("Unexpected state for : " + e.toString()); + } + } + if(compactionMetaInfo.compactionIds.size() > 0) { + try { + if (callback != null) { + callback.onWaitForCompaction(); + } + Thread.sleep(pollIntervalMs); + } catch (InterruptedException ex) { + ;//this only responds to ^C + } + } + } + } + } + + + /** + * Generates a set compaction commands to run on pre Hive 3 cluster + */ + private static void makeCompactionScript(List<String> commands, String scriptLocation, + CompactionMetaInfo compactionMetaInfo) throws IOException { + if (commands.isEmpty()) { + LOG.info("No compaction is necessary"); + return; + } + String fileName = "compacts_" + System.currentTimeMillis() + ".sql"; + LOG.debug("Writing compaction commands to " + fileName); + try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) { + //add post script + pw.println("-- Generated total of " + commands.size() + " compaction commands"); + if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) { + //to see it working in UTs + pw.println("-- The total volume of data to be compacted is " + + String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20))); + } + else { + pw.println("-- The total volume of data to be compacted is " + + String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30))); + } + pw.println(); + //todo: should be at the top of the file... + pw.println( + "-- Please note that compaction may be a heavyweight and time consuming process.\n" + + "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" + + "-- which they will be picked up by compactor Workers. The max number of\n" + + "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" + + "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" + + "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" + + "-- property if defined or 'default' if not defined. It's advisable to set the\n" + + "-- capacity of this queue appropriately"); + } + } + + private static PrintWriter createScript(List<String> commands, String fileName, + String scriptLocation) throws IOException { + FileWriter fw = new FileWriter(scriptLocation + "/" + fileName); + PrintWriter pw = new PrintWriter(fw); + for(String cmd : commands) { + pw.println(cmd + ";"); + } + return pw; + } + /** + * @return any compaction commands to run for {@code Table t} + */ + private static List<String> getCompactionCommands(Table t, HiveConf conf, + IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, + ValidTxnList txns) throws IOException, TException, HiveException { + if(!isFullAcidTable(t)) { + return Collections.emptyList(); + } + if(t.getPartitionKeysSize() <= 0) { + //not partitioned + if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo, txns)) { + return Collections.emptyList(); + } + + List<String> cmds = new ArrayList<>(); + cmds.add(getCompactionCommand(t, null)); + if(execute) { + scheduleCompaction(t, null, db, compactionMetaInfo); + } + return cmds; + } + List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); + int batchSize = PARTITION_BATCH_SIZE; + int numWholeBatches = partNames.size()/batchSize; + List<String> compactionCommands = new ArrayList<>(); + for(int i = 0; i < numWholeBatches; i++) { + List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(i * batchSize, (i + 1) * batchSize)); + getCompactionCommands(t, partitionList, db, execute, compactionCommands, + compactionMetaInfo, conf, txns); + } + if(numWholeBatches * batchSize < partNames.size()) { + //last partial batch + List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(numWholeBatches * batchSize, partNames.size())); + getCompactionCommands(t, partitionList, db, execute, compactionCommands, + compactionMetaInfo, conf, txns); + } + return compactionCommands; + } + private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db, + boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo, + HiveConf conf, ValidTxnList txns) + throws IOException, TException, HiveException { + for (Partition p : partitionList) { + if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) { + compactionCommands.add(getCompactionCommand(t, p)); + if (execute) { + scheduleCompaction(t, p, db, compactionMetaInfo); + } + } + } + } + private static void scheduleCompaction(Table t, Partition p, Hive db, + CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException { + String partName = p == null ? null : + Warehouse.makePartName(t.getPartitionKeys(), p.getValues()); + CompactionResponse resp = + //this gives an easy way to get at compaction ID so we can only wait for those this + //utility started + db.compact2(t.getDbName(), t.getTableName(), partName, "major", null); + if(!resp.isAccepted()) { + LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + + " is already being compacted with id=" + resp.getId()); + } + else { + LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + + (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); + } + compactionMetaInfo.compactionIds.add(resp.getId()); + } + /** + * + * @param location - path to a partition (or table if not partitioned) dir + */ + private static boolean needsCompaction2(Path location, HiveConf conf, + CompactionMetaInfo compactionMetaInfo) throws IOException { + FileSystem fs = location.getFileSystem(conf); + FileStatus[] deltas = fs.listStatus(location, new PathFilter() { + @Override + public boolean accept(Path path) { + //checking for delete_delta is only so that this functionality can be exercised by code 3.0 + //which cannot produce any deltas with mix of update/insert events + return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); + } + }); + if(deltas == null || deltas.length == 0) { + //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact + //only if there are update/delete events. + return false; + } + deltaLoop: for(FileStatus delta : deltas) { + if(!delta.isDirectory()) { + //should never happen - just in case + continue; + } + FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { + @Override + public boolean accept(Path path) { + //since this is inside a delta dir created by Hive 2.x or earlier it can only contain + //bucket_x or bucket_x__flush_length + return path.getName().startsWith("bucket_"); + } + }); + for(FileStatus bucket : buckets) { + if(bucket.getPath().getName().endsWith("_flush_length")) { + //streaming ingest dir - cannot have update/delete events + continue deltaLoop; + } + if(needsCompaction(bucket, fs)) { + //found delete events - this 'location' needs compacting + compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + //todo: this is not remotely accurate if you have many (relevant) original files + return true; + } + } + } + return false; + } + /** + * + * @param location - path to a partition (or table if not partitioned) dir + */ + private static boolean needsCompaction(Path location, HiveConf conf, + CompactionMetaInfo compactionMetaInfo, ValidTxnList txns) throws IOException { + FileSystem fs = location.getFileSystem(conf); + FileStatus[] deltas = fs.listStatus(location, new PathFilter() { + @Override + public boolean accept(Path path) { + //checking for delete_delta is only so that this functionality can be exercised by code 3.0 + //which cannot produce any deltas with mix of update/insert events + return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); + } + }); + if(deltas == null || deltas.length == 0) { + //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact + //only if there are update/delete events. + return false; + } + /*getAcidState() is smart not to return any deltas in current if there is a base that covers + * them, i.e. if they were compacted but not yet cleaned. This means re-checking if + * compaction is needed should cheap(er)*/ + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); + deltaLoop: for(AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) { + FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { + @Override + public boolean accept(Path path) { + //since this is inside a delta dir created by Hive 2.x or earlier it can only contain + //bucket_x or bucket_x__flush_length + return path.getName().startsWith("bucket_"); + } + }); + for(FileStatus bucket : buckets) { + if(bucket.getPath().getName().endsWith("_flush_length")) { + //streaming ingest dir - cannot have update/delete events + continue deltaLoop; + } + if(needsCompaction(bucket, fs)) { + //found delete events - this 'location' needs compacting + compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + + //if there are un-compacted original files, they will be included in compaction, so + //count at the size for 'cost' estimation later + for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) { + FileStatus fileStatus = origFile.getFileStatus(); + if(fileStatus != null) { + compactionMetaInfo.numberOfBytes += fileStatus.getLen(); + } + } + return true; + } + } + } + return false; + } + + /** + * @param location - path to a partition (or table if not partitioned) dir + */ + private static long getDataSize(Path location, HiveConf conf) throws IOException { + FileSystem fs = location.getFileSystem(conf); + ContentSummary cs = fs.getContentSummary(location); + return cs.getLength(); + } + private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException { + //create reader, look at footer + //no need to check side file since it can only be in a streaming ingest delta + Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf()) + .filesystem(fs)); + AcidStats as = OrcAcidUtils.parseAcidStats(orcReader); + if(as == null) { + //should never happen since we are reading bucket_x written by acid write + throw new IllegalStateException("AcidStats missing in " + bucket.getPath()); + } + return as.deletes > 0 || as.updates > 0; + } + private static String getCompactionCommand(Table t, Partition p) { + StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); + if(t.getPartitionKeysSize() > 0) { + assert p != null : "must supply partition for partitioned table " + + Warehouse.getQualifiedName(t); + sb.append(" PARTITION("); + for (int i = 0; i < t.getPartitionKeysSize(); i++) { + sb.append(t.getPartitionKeys().get(i).getName()).append('=').append( + genPartValueString(t.getPartitionKeys().get(i).getType(), p.getValues().get(i))). + append(","); + } + sb.setCharAt(sb.length() - 1, ')');//replace trailing ',' + } + return sb.append(" COMPACT 'major'").toString(); + } + + /** + * This is copy-pasted from {@link org.apache.hadoop.hive.ql.parse.ColumnStatsSemanticAnalyzer}, + * which can't be refactored since this is linked against Hive 2.x + */ + private static String genPartValueString(String partColType, String partVal) { + String returnVal = partVal; + if (partColType.equals(serdeConstants.STRING_TYPE_NAME) || + partColType.contains(serdeConstants.VARCHAR_TYPE_NAME) || + partColType.contains(serdeConstants.CHAR_TYPE_NAME)) { + returnVal = "'" + escapeSQLString(partVal) + "'"; + } else if (partColType.equals(serdeConstants.TINYINT_TYPE_NAME)) { + returnVal = partVal + "Y"; + } else if (partColType.equals(serdeConstants.SMALLINT_TYPE_NAME)) { + returnVal = partVal + "S"; + } else if (partColType.equals(serdeConstants.INT_TYPE_NAME)) { + returnVal = partVal; + } else if (partColType.equals(serdeConstants.BIGINT_TYPE_NAME)) { + returnVal = partVal + "L"; + } else if (partColType.contains(serdeConstants.DECIMAL_TYPE_NAME)) { + returnVal = partVal + "BD"; + } else if (partColType.equals(serdeConstants.DATE_TYPE_NAME) || + partColType.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) { + returnVal = partColType + " '" + escapeSQLString(partVal) + "'"; + } else { + //for other usually not used types, just quote the value + returnVal = "'" + escapeSQLString(partVal) + "'"; + } + + return returnVal; + } + private static boolean isFullAcidTable(Table t) { + if (t.getParametersSize() <= 0) { + //cannot be acid + return false; + } + String transacationalValue = t.getParameters() + .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) { + System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t)); + return true; + } + return false; + } + private static boolean isAcidEnabled(HiveConf hiveConf) { + String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + return txnMgr.equals(dbTxnMgr) && concurrency; + } + + private static class CompactionMetaInfo { + /** + * total number of bytes to be compacted across all compaction commands + */ + long numberOfBytes; + /** + * IDs of compactions launched by this utility + */ + Set<Long> compactionIds = new HashSet<>(); + } + + @VisibleForTesting + static abstract class Callback { + /** + * This is a hack enable Unit testing. Derby can't handle multiple concurrent threads but + * somehow Compactor needs to run to test "execute" mode. This callback can be used + * to run Worker. For TESTING ONLY. + */ + void onWaitForCompaction() throws MetaException {} + } + @VisibleForTesting + static Callback callback; + @VisibleForTesting + static int pollIntervalMs = 1000*30; + /** + * can set it from tests to test when config needs something other than default values + */ + @VisibleForTesting + static HiveConf hiveConf = null; +} http://git-wip-us.apache.org/repos/asf/hive/blob/28b24dbf/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java ---------------------------------------------------------------------- diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java new file mode 100644 index 0000000..4fe7007 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestPreUpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(TestPreUpgradeTool.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestPreUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + private String getTestDataDir() { + return TEST_DATA_DIR; + } + + /** + * preUpgrade: test tables that need to be compacted, waits for compaction + * postUpgrade: generates scripts w/o asserts + */ + @Test + public void testUpgrade() throws Exception { + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + runStatementOnDriver("drop table if exists TAcid"); + runStatementOnDriver("drop table if exists TAcidPart"); + runStatementOnDriver("drop table if exists TFlat"); + runStatementOnDriver("drop table if exists TFlatText"); + + runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p tinyint) clustered by (b) into 2 buckets stored" + + " as orc TBLPROPERTIES ('transactional'='true')"); + //on 2.x these are guaranteed to not be acid + runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); + runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); + + + //this needs major compaction + runStatementOnDriver("insert into TAcid" + makeValuesClause(data)); + runStatementOnDriver("update TAcid set a = 1 where b = 2"); + + //this table needs to be converted to CRUD Acid + runStatementOnDriver("insert into TFlat" + makeValuesClause(data)); + + //this table needs to be converted to MM + runStatementOnDriver("insert into TFlatText" + makeValuesClause(data)); + + //p=10 needs major compaction + runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); + runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10"); + + //todo: add partitioned table that needs conversion to MM/Acid + + //todo: rename files case + String[] args = {"-location", getTestDataDir(), "-execute"}; + PreUpgradeTool.callback = new PreUpgradeTool.Callback() { + @Override + void onWaitForCompaction() throws MetaException { + runWorker(hiveConf); + } + }; + PreUpgradeTool.pollIntervalMs = 1; + PreUpgradeTool.hiveConf = hiveConf; + PreUpgradeTool.main(args); + /* + todo: parse + target/tmp/org.apache.hadoop.hive.upgrade.acid.TestPreUpgradeTool-1527286256834/compacts_1527286277624.sql + make sure it's the only 'compacts' file and contains + ALTER TABLE default.tacid COMPACT 'major'; +ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major'; + * */ + + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(2, resp.getCompactsSize()); + for(ShowCompactResponseElement e : resp.getCompacts()) { + Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState()); + } + + String[] args2 = {"-location", getTestDataDir()}; + PreUpgradeTool.main(args2); + /* + * todo: parse compacts script - make sure there is nothing in it + * */ + } + + private static void runWorker(HiveConf hiveConf) throws MetaException { + AtomicBoolean stop = new AtomicBoolean(true); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + private static String makeValuesClause(int[][] rows) { + assert rows.length > 0; + StringBuilder sb = new StringBuilder(" values"); + for(int[] row : rows) { + assert row.length > 0; + if(row.length > 1) { + sb.append("("); + } + for(int value : row) { + sb.append(value).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + if(row.length > 1) { + sb.append(")"); + } + sb.append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + return sb.toString(); + } + + private List<String> runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List<String> rs = new ArrayList<String>(); + d.getResults(rs); + return rs; + } + @Before + public void setUp() throws Exception { + setUpInternal(); + } + private void initHiveConf() { + hiveConf = new HiveConf(this.getClass()); + } + @Rule + public TestName testName = new TestName(); + private HiveConf hiveConf; + private Driver d; + private void setUpInternal() throws Exception { + initHiveConf(); + TxnDbUtil.cleanDb();//todo: api changed in 3.0 + FileUtils.deleteDirectory(new File(getTestDataDir())); + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "local"); + hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "system"); + hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "staging"); + hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "temp"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf + .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.prepDb();//todo: api changed in 3.0 + File f = new File(getWarehouseDir()); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(getWarehouseDir()).mkdirs())) { + throw new RuntimeException("Could not create " + getWarehouseDir()); + } + SessionState ss = SessionState.start(hiveConf); + ss.applyAuthorizationPolicy(); + d = new Driver(new QueryState(hiveConf), null); + d.setMaxRows(10000); + } + private String getWarehouseDir() { + return getTestDataDir() + "/warehouse"; + } + @After + public void tearDown() throws Exception { + if (d != null) { + d.close(); + d.destroy(); + d = null; + } + } + +}