PHOENIX-2743 HivePhoenixHandler for big-big join with predicate push down

Closes apache/phoenix#155, apache/phoenix#165

Signed-off-by: Josh Elser <els...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/537b90be
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/537b90be
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/537b90be

Branch: refs/heads/master
Commit: 537b90bef90ff650bb0f00fe0591e4fab6ec7391
Parents: d26b4eb
Author: Sergey Soldatov <sergey.solda...@gmail.com>
Authored: Mon Apr 18 23:36:36 2016 -0700
Committer: Josh Elser <els...@apache.org>
Committed: Tue Apr 19 18:08:26 2016 -0400

----------------------------------------------------------------------
 phoenix-hive/pom.xml                            |  158 +++
 .../apache/phoenix/hive/HivePhoenixStoreIT.java |  303 ++++
 .../org/apache/phoenix/hive/HiveTestUtil.java   | 1291 ++++++++++++++++++
 .../apache/phoenix/hive/PhoenixMetaHook.java    |  246 ++++
 .../phoenix/hive/PhoenixRecordUpdater.java      |  336 +++++
 .../org/apache/phoenix/hive/PhoenixRow.java     |   64 +
 .../org/apache/phoenix/hive/PhoenixRowKey.java  |   69 +
 .../org/apache/phoenix/hive/PhoenixSerDe.java   |  159 +++
 .../apache/phoenix/hive/PhoenixSerializer.java  |  169 +++
 .../phoenix/hive/PhoenixStorageHandler.java     |  212 +++
 .../PhoenixStorageHandlerConstants.java         |  108 ++
 .../hive/mapreduce/PhoenixInputFormat.java      |  269 ++++
 .../hive/mapreduce/PhoenixInputSplit.java       |  160 +++
 .../hive/mapreduce/PhoenixOutputFormat.java     |  112 ++
 .../hive/mapreduce/PhoenixRecordReader.java     |  216 +++
 .../hive/mapreduce/PhoenixRecordWriter.java     |  355 +++++
 .../hive/mapreduce/PhoenixResultWritable.java   |  211 +++
 .../AbstractPhoenixObjectInspector.java         |   59 +
 .../PhoenixBinaryObjectInspector.java           |   58 +
 .../PhoenixBooleanObjectInspector.java          |   50 +
 .../PhoenixByteObjectInspector.java             |   54 +
 .../PhoenixCharObjectInspector.java             |   51 +
 .../PhoenixDateObjectInspector.java             |   63 +
 .../PhoenixDecimalObjectInspector.java          |   63 +
 .../PhoenixDoubleObjectInspector.java           |   54 +
 .../PhoenixFloatObjectInspector.java            |   55 +
 .../PhoenixIntObjectInspector.java              |   51 +
 .../PhoenixListObjectInspector.java             |  105 ++
 .../PhoenixLongObjectInspector.java             |   51 +
 .../PhoenixObjectInspectorFactory.java          |  148 ++
 .../PhoenixShortObjectInspector.java            |   51 +
 .../PhoenixStringObjectInspector.java           |   72 +
 .../PhoenixTimestampObjectInspector.java        |   61 +
 .../hive/ppd/PhoenixPredicateDecomposer.java    |   82 ++
 .../ppd/PhoenixPredicateDecomposerManager.java  |   83 ++
 .../hive/ql/index/IndexPredicateAnalyzer.java   |  523 +++++++
 .../hive/ql/index/IndexSearchCondition.java     |  143 ++
 .../hive/ql/index/PredicateAnalyzerFactory.java |   40 +
 .../phoenix/hive/query/PhoenixQueryBuilder.java |  760 +++++++++++
 .../hive/util/PhoenixConnectionUtil.java        |   97 ++
 .../hive/util/PhoenixStorageHandlerUtil.java    |  278 ++++
 .../apache/phoenix/hive/util/PhoenixUtil.java   |  208 +++
 phoenix-server/pom.xml                          |    6 +
 pom.xml                                         |   39 +-
 44 files changed, 7741 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-hive/pom.xml b/phoenix-hive/pom.xml
new file mode 100644
index 0000000..e4b4a49
--- /dev/null
+++ b/phoenix-hive/pom.xml
@@ -0,0 +1,158 @@
+<?xml version='1.0'?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.phoenix</groupId>
+    <artifactId>phoenix</artifactId>
+    <version>4.8.0-HBase-1.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>phoenix-hive</artifactId>
+  <name>Phoenix - Hive</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-it</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>${maven-dependency-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-jar-with-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java 
b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
new file mode 100644
index 0000000..a707a06
--- /dev/null
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test class to run all Hive Phoenix integration tests against a MINI 
Map-Reduce cluster.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HivePhoenixStoreIT {
+
+    private static final Log LOG = LogFactory.getLog(HivePhoenixStoreIT.class);
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    private static Connection conn;
+    private static Configuration conf;
+    private static HiveTestUtil qt;
+    private static String hiveOutputDir;
+    private static String hiveLogDir;
+
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+        if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) {
+          LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which 
may cause "
+              + "issues with test execution via MiniDFSCluster");
+        }
+        hbaseTestUtil = new HBaseTestingUtility();
+        conf = hbaseTestUtil.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), 
"hive_output").toString();
+        File outputDir = new File(hiveOutputDir);
+        outputDir.mkdirs();
+        hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), 
"hive_log").toString();
+        File logDir = new File(hiveLogDir);
+        logDir.mkdirs();
+        // Setup Hive mini Server
+        Path testRoot = hbaseTestUtil.getDataTestDir();
+        System.setProperty("test.tmp.dir", testRoot.toString());
+        System.setProperty("test.warehouse.dir", (new Path(testRoot, 
"warehouse")).toString());
+
+        HiveTestUtil.MiniClusterType miniMR = HiveTestUtil.MiniClusterType.mr;
+        try {
+            qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, miniMR, null);
+        } catch (Exception e) {
+            LOG.error("Unexpected exception in setup", e);
+            fail("Unexpected exception in setup");
+        }
+
+        //Start HBase cluster
+        hbaseTestUtil.startMiniCluster(3);
+        MiniDFSCluster x = hbaseTestUtil.getDFSCluster();
+
+        Class.forName(PhoenixDriver.class.getName());
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL +
+                PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props);
+        // Setup Hive Output Folder
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("create table t(a integer primary key,b varchar)");
+    }
+
+    /**
+     * Create a table with two column, insert 1 row, check that phoenix table 
is created and
+     * the row is there
+     *
+     * @throws Exception
+     */
+    @Test
+    public void simpleTest() throws Exception {
+        String testName = "simpleTest";
+        // create a dummy outfile under log folder
+        hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, 
testName + ".out"));
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + 
".out").toString());
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + 
".out").toString());
+        StringBuilder sb = new StringBuilder();
+        sb.append("CREATE TABLE phoenix_table(ID STRING, SALARY STRING)" + 
HiveTestUtil.CRLF +
+                " STORED BY  
\"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+                .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+                "   'phoenix.table.name'='phoenix_table'," + HiveTestUtil.CRLF 
+
+                "   'phoenix.zookeeper.znode.parent'='hbase'," + 
HiveTestUtil.CRLF +
+                "   'phoenix.zookeeper.quorum'='localhost:" + 
hbaseTestUtil.getZkCluster()
+                .getClientPort() + "', 'phoenix.rowkeys'='id');");
+        sb.append("INSERT INTO TABLE phoenix_table" + HiveTestUtil.CRLF +
+                "VALUES ('10', '1000');" + HiveTestUtil.CRLF);
+        String fullPath = new Path(hbaseTestUtil.getDataTestDir(), 
testName).toString();
+        createFile(sb.toString(), fullPath);
+        runTest(testName, fullPath);
+
+        String phoenixQuery = "SELECT * FROM phoenix_table";
+        PreparedStatement statement = conn.prepareStatement(phoenixQuery);
+        ResultSet rs = statement.executeQuery();
+        assert (rs.getMetaData().getColumnCount() == 2);
+        assertTrue(rs.next());
+        assert (rs.getString(1).equals("10"));
+        assert (rs.getString(2).equals("1000"));
+
+    }
+
+    /**
+     * Datatype Test
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dataTypeTest() throws Exception {
+        String testName = "dataTypeTest";
+        // create a dummy outfile under log folder
+        hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, 
testName + ".out"));
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + 
".out").toString());
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + 
".out").toString());
+        StringBuilder sb = new StringBuilder();
+        sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, 
ts TIMESTAMP, db " +
+                "DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+                " STORED BY  
\"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+                .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+                "   'phoenix.hbase.table.name'='phoenix_datatype'," + 
HiveTestUtil.CRLF +
+                "   'phoenix.zookeeper.znode.parent'='hbase'," + 
HiveTestUtil.CRLF +
+                "   'phoenix.zookeeper.quorum'='localhost:" + 
hbaseTestUtil.getZkCluster()
+                .getClientPort() + "'," + HiveTestUtil.CRLF +
+                "   'phoenix.rowkeys'='id');");
+        sb.append("INSERT INTO TABLE phoenix_datatype" + HiveTestUtil.CRLF +
+                "VALUES (10, \"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" 
+ HiveTestUtil.CRLF);
+        String fullPath = new Path(hbaseTestUtil.getDataTestDir(), 
testName).toString();
+        createFile(sb.toString(), fullPath);
+        runTest(testName, fullPath);
+
+        String phoenixQuery = "SELECT * FROM phoenix_datatype";
+        PreparedStatement statement = conn.prepareStatement(phoenixQuery);
+        ResultSet rs = statement.executeQuery();
+        assert (rs.getMetaData().getColumnCount() == 6);
+        while (rs.next()) {
+            assert (rs.getInt(1) == 10);
+            assert (rs.getString(2).equalsIgnoreCase("foodesc"));
+            /* Need a way how to correctly handle timestamp since Hive's 
implementation uses
+            time zone information but Phoenix doesn't.
+             */
+            //assert(rs.getTimestamp(3).equals(Timestamp.valueOf("2013-01-05 
02:01:01")));
+            assert (rs.getDouble(4) == 200);
+            assert (rs.getFloat(5) == 2.0);
+            assert (rs.getInt(6) == -1);
+        }
+    }
+
+    /**
+     * Datatype Test
+     *
+     * @throws Exception
+     */
+    @Test
+    public void MultiKey() throws Exception {
+        String testName = "MultiKey";
+        // create a dummy outfile under log folder
+        hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, 
testName + ".out"));
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + 
".out").toString());
+        createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + 
".out").toString());
+        StringBuilder sb = new StringBuilder();
+        sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 
String,description STRING, ts " +
+                "TIMESTAMP, db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+                " STORED BY  
\"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+                .CRLF +
+                " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+                "   'phoenix.hbase.table.name'='phoenix_MultiKey'," + 
HiveTestUtil.CRLF +
+                "   'phoenix.zookeeper.znode.parent'='hbase'," + 
HiveTestUtil.CRLF +
+                "   'phoenix.zookeeper.quorum'='localhost:" + 
hbaseTestUtil.getZkCluster()
+                .getClientPort() + "'," + HiveTestUtil.CRLF +
+                "   'phoenix.rowkeys'='id,id2');");
+        sb.append("INSERT INTO TABLE phoenix_MultiKey" + HiveTestUtil.CRLF +
+                "VALUES (10,  \"part2\",\"foodesc\",\"2013-01-05 
01:01:01\",200,2.0,-1);" +
+                HiveTestUtil.CRLF);
+        String fullPath = new Path(hbaseTestUtil.getDataTestDir(), 
testName).toString();
+        createFile(sb.toString(), fullPath);
+        runTest(testName, fullPath);
+
+        String phoenixQuery = "SELECT * FROM phoenix_MultiKey";
+        PreparedStatement statement = conn.prepareStatement(phoenixQuery);
+        ResultSet rs = statement.executeQuery();
+        assert (rs.getMetaData().getColumnCount() == 7);
+        while (rs.next()) {
+            assert (rs.getInt(1) == 10);
+            assert (rs.getString(2).equalsIgnoreCase("part2"));
+            assert (rs.getString(3).equalsIgnoreCase("foodesc"));
+            //assert(rs.getTimestamp(4).equals(Timestamp.valueOf("2013-01-05 
02:01:01")));
+            assert (rs.getDouble(5) == 200);
+            assert (rs.getFloat(6) == 2.0);
+            assert (rs.getInt(7) == -1);
+        }
+    }
+
+
+    private void runTest(String fname, String fpath) throws Exception {
+        long startTime = System.currentTimeMillis();
+        try {
+            LOG.info("Begin query: " + fname);
+            qt.addFile(fpath);
+
+            if (qt.shouldBeSkipped(fname)) {
+                LOG.info("Test " + fname + " skipped");
+                return;
+            }
+
+            qt.cliInit(fname);
+            qt.clearTestSideEffects();
+            int ecode = qt.executeClient(fname);
+            if (ecode != 0) {
+                qt.failed(ecode, fname, null);
+            }
+
+            ecode = qt.checkCliDriverResults(fname);
+            if (ecode != 0) {
+                qt.failedDiff(ecode, fname, null);
+            }
+            qt.clearPostTestEffects();
+
+        } catch (Throwable e) {
+            qt.failed(e, fname, null);
+        }
+
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 
+ "s");
+        assertTrue("Test passed", true);
+    }
+
+    private void createFile(String content, String fullName) throws 
IOException {
+        FileUtils.write(new File(fullName), content);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        if (qt != null) {
+            try {
+                qt.shutdown();
+            } catch (Exception e) {
+                LOG.error("Unexpected exception in setup", e);
+                fail("Unexpected exception in tearDown");
+            }
+        }
+        try {
+            conn.close();
+        } finally {
+            try {
+                PhoenixDriver.INSTANCE.close();
+            } finally {
+                try {
+                    DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+                } finally {
+                    hbaseTestUtil.shutdownMiniCluster();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java 
b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
new file mode 100644
index 0000000..57722f8
--- /dev/null
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
@@ -0,0 +1,1291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import com.google.common.collect.ImmutableList;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hive.cli.CliDriver;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
+import org.apache.hadoop.hive.common.io.DigestPrintStream;
+import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
+import org.apache.hadoop.hive.common.io.SortPrintStream;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.common.util.StreamPrinter;
+import org.apache.tools.ant.BuildException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * HiveTestUtil cloned from Hive QTestUtil. Can be outdated and may require 
update once a problem
+ * found.
+ */
+public class HiveTestUtil {
+
+    public static final String UTF_8 = "UTF-8";
+    private static final Log LOG = LogFactory.getLog("HiveTestUtil");
+    private static final String QTEST_LEAVE_FILES = "QTEST_LEAVE_FILES";
+    public static final String DEFAULT_DATABASE_NAME = "default";
+
+    private String testWarehouse;
+    private final String testFiles;
+    protected final String outDir;
+    protected final String logDir;
+    private final TreeMap<String, String> qMap;
+    private final Set<String> qSkipSet;
+    private final Set<String> qSortSet;
+    private final Set<String> qSortQuerySet;
+    private final Set<String> qHashQuerySet;
+    private final Set<String> qSortNHashQuerySet;
+    private final Set<String> qJavaVersionSpecificOutput;
+    private static final String SORT_SUFFIX = ".sorted";
+    private static MiniClusterType clusterType = MiniClusterType.none;
+    private ParseDriver pd;
+    protected Hive db;
+    protected HiveConf conf;
+    private BaseSemanticAnalyzer sem;
+    protected final boolean overWrite;
+    private CliDriver cliDriver;
+    private HadoopShims.MiniMrShim mr = null;
+    private HadoopShims.MiniDFSShim dfs = null;
+    private String hadoopVer = null;
+    private HiveTestSetup setup = null;
+    private boolean isSessionStateStarted = false;
+    private static final String javaVersion = getJavaVersion();
+
+    private String initScript = "";
+    private String cleanupScript = "";
+
+    public HiveConf getConf() {
+        return conf;
+    }
+
+    public boolean deleteDirectory(File path) {
+        if (path.exists()) {
+            File[] files = path.listFiles();
+            for (File file : files) {
+                if (file.isDirectory()) {
+                    deleteDirectory(file);
+                } else {
+                    file.delete();
+                }
+            }
+        }
+        return (path.delete());
+    }
+
+    public void copyDirectoryToLocal(Path src, Path dest) throws Exception {
+
+        FileSystem srcFs = src.getFileSystem(conf);
+        FileSystem destFs = dest.getFileSystem(conf);
+        if (srcFs.exists(src)) {
+            FileStatus[] files = srcFs.listStatus(src);
+            for (FileStatus file : files) {
+                String name = file.getPath().getName();
+                Path dfs_path = file.getPath();
+                Path local_path = new Path(dest, name);
+
+                if (file.isDir()) {
+                    if (!destFs.exists(local_path)) {
+                        destFs.mkdirs(local_path);
+                    }
+                    copyDirectoryToLocal(dfs_path, local_path);
+                } else {
+                    srcFs.copyToLocalFile(dfs_path, local_path);
+                }
+            }
+        }
+    }
+
+    static Pattern mapTok = Pattern.compile("(\\.?)(.*)_map_(.*)");
+    static Pattern reduceTok = 
Pattern.compile("(.*)(reduce_[^\\.]*)((\\..*)?)");
+
+    public void normalizeNames(File path) throws Exception {
+        if (path.isDirectory()) {
+            File[] files = path.listFiles();
+            for (File file : files) {
+                normalizeNames(file);
+            }
+        } else {
+            Matcher m = reduceTok.matcher(path.getName());
+            if (m.matches()) {
+                String name = m.group(1) + "reduce" + m.group(3);
+                path.renameTo(new File(path.getParent(), name));
+            } else {
+                m = mapTok.matcher(path.getName());
+                if (m.matches()) {
+                    String name = m.group(1) + "map_" + m.group(3);
+                    path.renameTo(new File(path.getParent(), name));
+                }
+            }
+        }
+    }
+
+    public String getOutputDirectory() {
+        return outDir;
+    }
+
+    public String getLogDirectory() {
+        return logDir;
+    }
+
+    private String getHadoopMainVersion(String input) {
+        if (input == null) {
+            return null;
+        }
+        Pattern p = Pattern.compile("^(\\d+\\.\\d+).*");
+        Matcher m = p.matcher(input);
+        if (m.matches()) {
+            return m.group(1);
+        }
+        return null;
+    }
+
+    public void initConf() throws Exception {
+        // Plug verifying metastore in for testing.
+        conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
+                "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
+
+        if (mr != null) {
+            assert dfs != null;
+
+            mr.setupConfiguration(conf);
+
+            // set fs.default.name to the uri of mini-dfs
+            String dfsUriString = 
WindowsPathUtil.getHdfsUriString(dfs.getFileSystem().getUri()
+                    .toString());
+            conf.setVar(HiveConf.ConfVars.HADOOPFS, dfsUriString);
+            // hive.metastore.warehouse.dir needs to be set relative to the 
mini-dfs
+            conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
+                    (new Path(dfsUriString,
+                            "/build/ql/test/data/warehouse/")).toString());
+        }
+
+        // Windows paths should be converted after 
MiniMrShim.setupConfiguration()
+        // since setupConfiguration may overwrite configuration values.
+        if (Shell.WINDOWS) {
+            WindowsPathUtil.convertPathsFromWindowsToHdfs(conf);
+        }
+    }
+
+    public enum MiniClusterType {
+        mr,
+        tez,
+        none;
+
+        public static MiniClusterType valueForString(String type) {
+            if (type.equals("miniMR")) {
+                return mr;
+            } else if (type.equals("tez")) {
+                return tez;
+            } else {
+                return none;
+            }
+        }
+    }
+
+    public HiveTestUtil(String outDir, String logDir, MiniClusterType 
clusterType, String hadoopVer)
+            throws Exception {
+        this(outDir, logDir, clusterType, null, hadoopVer);
+    }
+
+    public HiveTestUtil(String outDir, String logDir, MiniClusterType 
clusterType, String confDir,
+                        String hadoopVer)
+            throws Exception {
+        this.outDir = outDir;
+        this.logDir = logDir;
+        if (confDir != null && !confDir.isEmpty()) {
+            HiveConf.setHiveSiteLocation(new URL("file://" + new 
File(confDir).toURI().getPath()
+                    + "/hive-site.xml"));
+            LOG.info("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+        }
+        conf = new HiveConf();
+        String tmpBaseDir = System.getProperty("test.tmp.dir");
+        if (tmpBaseDir == null || tmpBaseDir == "") {
+            tmpBaseDir = System.getProperty("java.io.tmpdir");
+        }
+        String metaStoreURL = "jdbc:derby:" + tmpBaseDir + File.separator + 
"metastore_dbtest;" +
+                "create=true";
+        conf.set(ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL);
+        System.setProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, 
metaStoreURL);
+
+        //set where derby logs
+        File derbyLogFile = new File(tmpBaseDir + "/derby.log");
+        derbyLogFile.createNewFile();
+        System.setProperty("derby.stream.error.file", derbyLogFile.getPath());
+
+        this.hadoopVer = getHadoopMainVersion(hadoopVer);
+        qMap = new TreeMap<String, String>();
+        qSkipSet = new HashSet<String>();
+        qSortSet = new HashSet<String>();
+        qSortQuerySet = new HashSet<String>();
+        qHashQuerySet = new HashSet<String>();
+        qSortNHashQuerySet = new HashSet<String>();
+        qJavaVersionSpecificOutput = new HashSet<String>();
+        this.clusterType = clusterType;
+
+        HadoopShims shims = ShimLoader.getHadoopShims();
+        int numberOfDataNodes = 4;
+
+        if (clusterType != MiniClusterType.none) {
+            dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
+            FileSystem fs = dfs.getFileSystem();
+            String uriString = 
WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
+            if (clusterType == MiniClusterType.tez) {
+                mr = shims.getMiniTezCluster(conf, 4, uriString, 1);
+            } else {
+                mr = shims.getMiniMrCluster(conf, 4, uriString, 1);
+            }
+        }
+
+        initConf();
+
+        // Use the current directory if it is not specified
+        String dataDir = conf.get("test.data.files");
+        if (dataDir == null) {
+            dataDir = new File(".").getAbsolutePath() + "/data/files";
+        }
+
+        testFiles = dataDir;
+
+        // Use the current directory if it is not specified
+        String scriptsDir = conf.get("test.data.scripts");
+        if (scriptsDir == null) {
+            scriptsDir = new File(".").getAbsolutePath() + "/data/scripts";
+        }
+        if (!initScript.isEmpty()) {
+            this.initScript = scriptsDir + "/" + initScript;
+        }
+        if (!cleanupScript.isEmpty()) {
+            this.cleanupScript = scriptsDir + "/" + cleanupScript;
+        }
+
+        overWrite = 
"true".equalsIgnoreCase(System.getProperty("test.output.overwrite"));
+
+        setup = new HiveTestSetup();
+        setup.preTest(conf);
+        init();
+    }
+
+    public void shutdown() throws Exception {
+        cleanUp();
+        setup.tearDown();
+        if (mr != null) {
+            mr.shutdown();
+            mr = null;
+        }
+        FileSystem.closeAll();
+        if (dfs != null) {
+            dfs.shutdown();
+            dfs = null;
+        }
+    }
+
+    public String readEntireFileIntoString(File queryFile) throws IOException {
+        InputStreamReader isr = new InputStreamReader(
+                new BufferedInputStream(new FileInputStream(queryFile)), 
HiveTestUtil.UTF_8);
+        StringWriter sw = new StringWriter();
+        try {
+            IOUtils.copy(isr, sw);
+        } finally {
+            if (isr != null) {
+                isr.close();
+            }
+        }
+        return sw.toString();
+    }
+
+    public void addFile(String queryFile) throws IOException {
+        addFile(queryFile, false);
+    }
+
+    public void addFile(String queryFile, boolean partial) throws IOException {
+        addFile(new File(queryFile));
+    }
+
+    public void addFile(File qf) throws IOException {
+        addFile(qf, false);
+    }
+
+    public void addFile(File qf, boolean partial) throws IOException {
+        String query = readEntireFileIntoString(qf);
+        qMap.put(qf.getName(), query);
+        if (partial) return;
+
+        if (matches(SORT_BEFORE_DIFF, query)) {
+            qSortSet.add(qf.getName());
+        } else if (matches(SORT_QUERY_RESULTS, query)) {
+            qSortQuerySet.add(qf.getName());
+        } else if (matches(HASH_QUERY_RESULTS, query)) {
+            qHashQuerySet.add(qf.getName());
+        } else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) {
+            qSortNHashQuerySet.add(qf.getName());
+        }
+    }
+
+    private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- 
SORT_BEFORE_DIFF");
+    private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- 
SORT_QUERY_RESULTS");
+    private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- 
HASH_QUERY_RESULTS");
+    private static final Pattern SORT_AND_HASH_QUERY_RESULTS = 
Pattern.compile("-- " +
+            "SORT_AND_HASH_QUERY_RESULTS");
+
+    private boolean matches(Pattern pattern, String query) {
+        Matcher matcher = pattern.matcher(query);
+        if (matcher.find()) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Get formatted Java version to include minor version, but
+     * exclude patch level.
+     *
+     * @return Java version formatted as major_version.minor_version
+     */
+    private static String getJavaVersion() {
+        String version = System.getProperty("java.version");
+        if (version == null) {
+            throw new NullPointerException("No java version could be 
determined " +
+                    "from system properties");
+        }
+
+        // "java version" system property is formatted
+        // major_version.minor_version.patch_level.
+        // Find second dot, instead of last dot, to be safe
+        int pos = version.indexOf('.');
+        pos = version.indexOf('.', pos + 1);
+        return version.substring(0, pos);
+    }
+
+    /**
+     * Clear out any side effects of running tests
+     */
+    public void clearPostTestEffects() throws Exception {
+        setup.postTest(conf);
+    }
+
+    /**
+     * Clear out any side effects of running tests
+     */
+    public void clearTablesCreatedDuringTests() throws Exception {
+        if (System.getenv(QTEST_LEAVE_FILES) != null) {
+            return;
+        }
+
+        // Delete any tables other than the source tables
+        // and any databases other than the default database.
+        for (String dbName : db.getAllDatabases()) {
+            SessionState.get().setCurrentDatabase(dbName);
+            for (String tblName : db.getAllTables()) {
+                if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+                    Table tblObj = db.getTable(tblName);
+                    // dropping index table can not be dropped directly. 
Dropping the base
+                    // table will automatically drop all its index table
+                    if (tblObj.isIndexTable()) {
+                        continue;
+                    }
+                    db.dropTable(dbName, tblName);
+                } else {
+                    // this table is defined in srcTables, drop all indexes on 
it
+                    List<Index> indexes = db.getIndexes(dbName, tblName, 
(short) -1);
+                    if (indexes != null && indexes.size() > 0) {
+                        for (Index index : indexes) {
+                            db.dropIndex(dbName, tblName, 
index.getIndexName(), true, true);
+                        }
+                    }
+                }
+            }
+            if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+                // Drop cascade, may need to drop functions
+                db.dropDatabase(dbName, true, true, true);
+            }
+        }
+
+        // delete remaining directories for external tables (can affect stats 
for following tests)
+        try {
+            Path p = new Path(testWarehouse);
+            FileSystem fileSystem = p.getFileSystem(conf);
+            if (fileSystem.exists(p)) {
+                for (FileStatus status : fileSystem.listStatus(p)) {
+                    if (status.isDir()) {
+                        fileSystem.delete(status.getPath(), true);
+                    }
+                }
+            }
+        } catch (IllegalArgumentException e) {
+            // ignore.. provides invalid url sometimes intentionally
+        }
+        SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME);
+
+        List<String> roleNames = db.getAllRoleNames();
+        for (String roleName : roleNames) {
+            if (!"PUBLIC".equalsIgnoreCase(roleName) && 
!"ADMIN".equalsIgnoreCase(roleName)) {
+                db.dropRole(roleName);
+            }
+        }
+    }
+
+    /**
+     * Clear out any side effects of running tests
+     */
+    public void clearTestSideEffects() throws Exception {
+        if (System.getenv(QTEST_LEAVE_FILES) != null) {
+            return;
+        }
+
+        clearTablesCreatedDuringTests();
+    }
+
+    public void cleanUp() throws Exception {
+        if (!isSessionStateStarted) {
+            startSessionState();
+        }
+        if (System.getenv(QTEST_LEAVE_FILES) != null) {
+            return;
+        }
+
+        clearTablesCreatedDuringTests();
+
+        SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", 
true);
+
+        if (cleanupScript != "") {
+            String cleanupCommands = readEntireFileIntoString(new 
File(cleanupScript));
+            LOG.info("Cleanup (" + cleanupScript + "):\n" + cleanupCommands);
+            if (cliDriver == null) {
+                cliDriver = new CliDriver();
+            }
+            cliDriver.processLine(cleanupCommands);
+        }
+
+        SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", 
false);
+
+        // delete any contents in the warehouse dir
+        Path p = new Path(testWarehouse);
+        FileSystem fs = p.getFileSystem(conf);
+
+        try {
+            FileStatus[] ls = fs.listStatus(p);
+            for (int i = 0; (ls != null) && (i < ls.length); i++) {
+                fs.delete(ls[i].getPath(), true);
+            }
+        } catch (FileNotFoundException e) {
+            // Best effort
+        }
+
+        FunctionRegistry.unregisterTemporaryUDF("test_udaf");
+        FunctionRegistry.unregisterTemporaryUDF("test_error");
+    }
+
+    public void createSources() throws Exception {
+        if (!isSessionStateStarted) {
+            startSessionState();
+        }
+        conf.setBoolean("hive.test.init.phase", true);
+
+        if (cliDriver == null) {
+            cliDriver = new CliDriver();
+        }
+        cliDriver.processLine("set test.data.dir=" + testFiles + ";");
+
+        conf.setBoolean("hive.test.init.phase", false);
+    }
+
+    public void init() throws Exception {
+        testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        String execEngine = conf.get("hive.execution.engine");
+        conf.set("hive.execution.engine", "mr");
+        SessionState.start(conf);
+        conf.set("hive.execution.engine", execEngine);
+        db = Hive.get(conf);
+        pd = new ParseDriver();
+        sem = new SemanticAnalyzer(conf);
+    }
+
+    public void init(String tname) throws Exception {
+        cleanUp();
+        createSources();
+        cliDriver.processCmd("set hive.cli.print.header=true;");
+    }
+
+    public void cliInit(String tname) throws Exception {
+        cliInit(tname, true);
+    }
+
+    public String cliInit(String tname, boolean recreate) throws Exception {
+        if (recreate) {
+            cleanUp();
+            createSources();
+        }
+
+        HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
+                
"org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator");
+        Utilities.clearWorkMap();
+        CliSessionState ss = new CliSessionState(conf);
+        assert ss != null;
+        ss.in = System.in;
+
+        String outFileExtension = getOutFileExtension(tname);
+        String stdoutName = null;
+        if (outDir != null) {
+            File qf = new File(outDir, tname);
+            stdoutName = qf.getName().concat(outFileExtension);
+        } else {
+            stdoutName = tname + outFileExtension;
+        }
+
+        File outf = new File(logDir, stdoutName);
+        OutputStream fo = new BufferedOutputStream(new FileOutputStream(outf));
+        if (qSortQuerySet.contains(tname)) {
+            ss.out = new SortPrintStream(fo, "UTF-8");
+        } else if (qHashQuerySet.contains(tname)) {
+            ss.out = new DigestPrintStream(fo, "UTF-8");
+        } else if (qSortNHashQuerySet.contains(tname)) {
+            ss.out = new SortAndDigestPrintStream(fo, "UTF-8");
+        } else {
+            ss.out = new PrintStream(fo, true, "UTF-8");
+        }
+        ss.err = new CachingPrintStream(fo, true, "UTF-8");
+        ss.setIsSilent(true);
+        SessionState oldSs = SessionState.get();
+
+        if (oldSs != null && clusterType == MiniClusterType.tez) {
+            oldSs.close();
+        }
+
+        if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
+            oldSs.out.close();
+        }
+        SessionState.start(ss);
+
+        cliDriver = new CliDriver();
+        cliDriver.processInitFiles(ss);
+
+        return outf.getAbsolutePath();
+    }
+
+    private CliSessionState startSessionState()
+            throws IOException {
+
+        HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
+                
"org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator");
+
+        String execEngine = conf.get("hive.execution.engine");
+        conf.set("hive.execution.engine", "mr");
+        CliSessionState ss = new CliSessionState(conf);
+        assert ss != null;
+        ss.in = System.in;
+        ss.out = System.out;
+        ss.err = System.out;
+
+        SessionState oldSs = SessionState.get();
+        if (oldSs != null && clusterType == MiniClusterType.tez) {
+            oldSs.close();
+        }
+        if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
+            oldSs.out.close();
+        }
+        SessionState.start(ss);
+
+        isSessionStateStarted = true;
+
+        conf.set("hive.execution.engine", execEngine);
+        return ss;
+    }
+
+    public int executeOne(String tname) {
+        String q = qMap.get(tname);
+
+        if (q.indexOf(";") == -1) {
+            return -1;
+        }
+
+        String q1 = q.substring(0, q.indexOf(";") + 1);
+        String qrest = q.substring(q.indexOf(";") + 1);
+        qMap.put(tname, qrest);
+
+        LOG.info("Executing " + q1);
+        return cliDriver.processLine(q1);
+    }
+
+    public static final String CRLF = System.getProperty("line.separator");
+
+    public int executeClient(String tname1, String tname2) {
+        String commands = getCommands(tname1) + CRLF + getCommands(tname2);
+        return cliDriver.processLine(commands);
+    }
+
+    public int executeClient(String tname) {
+        return cliDriver.processLine(getCommands(tname), false);
+    }
+
+    private String getCommands(String tname) {
+        String commands = qMap.get(tname);
+        StringBuilder newCommands = new StringBuilder(commands.length());
+        int lastMatchEnd = 0;
+        Matcher commentMatcher = Pattern.compile("^--.*$", 
Pattern.MULTILINE).matcher(commands);
+        while (commentMatcher.find()) {
+            newCommands.append(commands.substring(lastMatchEnd, 
commentMatcher.start()));
+            newCommands.append(commentMatcher.group().replaceAll("(?<!\\\\);", 
"\\\\;"));
+            lastMatchEnd = commentMatcher.end();
+        }
+        newCommands.append(commands.substring(lastMatchEnd, 
commands.length()));
+        commands = newCommands.toString();
+        return commands;
+    }
+
+    public boolean shouldBeSkipped(String tname) {
+        return qSkipSet.contains(tname);
+    }
+
+    private String getOutFileExtension(String fname) {
+        String outFileExtension = ".out";
+        if (qJavaVersionSpecificOutput.contains(fname)) {
+            outFileExtension = ".java" + javaVersion + ".out";
+        }
+
+        return outFileExtension;
+    }
+
+    /**
+     * Given the current configurations (e.g., hadoop version and execution 
mode), return
+     * the correct file name to compare with the current test run output.
+     *
+     * @param outDir   The directory where the reference log files are stored.
+     * @param testName The test file name (terminated by ".out").
+     * @return The file name appended with the configuration values if it 
exists.
+     */
+    public String outPath(String outDir, String testName) {
+        String ret = (new File(outDir, testName)).getPath();
+        // List of configurations. Currently the list consists of hadoop 
version and execution
+        // mode only
+        List<String> configs = new ArrayList<String>();
+        configs.add(this.hadoopVer);
+
+        Deque<String> stack = new LinkedList<String>();
+        StringBuilder sb = new StringBuilder();
+        sb.append(testName);
+        stack.push(sb.toString());
+
+        // example file names are input1.q.out_0.20.0_minimr or 
input2.q.out_0.17
+        for (String s : configs) {
+            sb.append('_');
+            sb.append(s);
+            stack.push(sb.toString());
+        }
+        while (stack.size() > 0) {
+            String fileName = stack.pop();
+            File f = new File(outDir, fileName);
+            if (f.exists()) {
+                ret = f.getPath();
+                break;
+            }
+        }
+        return ret;
+    }
+
+    private Pattern[] toPattern(String[] patternStrs) {
+        Pattern[] patterns = new Pattern[patternStrs.length];
+        for (int i = 0; i < patternStrs.length; i++) {
+            patterns[i] = Pattern.compile(patternStrs[i]);
+        }
+        return patterns;
+    }
+
+    private void maskPatterns(Pattern[] patterns, String fname) throws 
Exception {
+        String maskPattern = "#### A masked pattern was here ####";
+
+        String line;
+        BufferedReader in;
+        BufferedWriter out;
+
+        File file = new File(fname);
+        File fileOrig = new File(fname + ".orig");
+        FileUtils.copyFile(file, fileOrig);
+
+        in = new BufferedReader(new InputStreamReader(new 
FileInputStream(fileOrig), "UTF-8"));
+        out = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(file), "UTF-8"));
+
+        boolean lastWasMasked = false;
+        while (null != (line = in.readLine())) {
+            for (Pattern pattern : patterns) {
+                line = pattern.matcher(line).replaceAll(maskPattern);
+            }
+
+            if (line.equals(maskPattern)) {
+                // We're folding multiple masked lines into one.
+                if (!lastWasMasked) {
+                    out.write(line);
+                    out.write("\n");
+                    lastWasMasked = true;
+                }
+            } else {
+                out.write(line);
+                out.write("\n");
+                lastWasMasked = false;
+            }
+        }
+
+        in.close();
+        out.close();
+    }
+
+    private final Pattern[] planMask = toPattern(new String[]{
+            ".*file:.*",
+            ".*pfile:.*",
+            ".*hdfs:.*",
+            ".*/tmp/.*",
+            ".*invalidscheme:.*",
+            ".*lastUpdateTime.*",
+            ".*lastAccessTime.*",
+            ".*lastModifiedTime.*",
+            ".*[Oo]wner.*",
+            ".*CreateTime.*",
+            ".*LastAccessTime.*",
+            ".*Location.*",
+            ".*LOCATION '.*",
+            ".*transient_lastDdlTime.*",
+            ".*last_modified_.*",
+            ".*at org.*",
+            ".*at sun.*",
+            ".*at java.*",
+            ".*at junit.*",
+            ".*Caused by:.*",
+            ".*LOCK_QUERYID:.*",
+            ".*LOCK_TIME:.*",
+            ".*grantTime.*",
+            ".*[.][.][.] [0-9]* more.*",
+            ".*job_[0-9_]*.*",
+            ".*job_local[0-9_]*.*",
+            ".*USING 'java -cp.*",
+            "^Deleted.*",
+            ".*DagName:.*",
+            ".*Input:.*/data/files/.*",
+            ".*Output:.*/data/files/.*",
+            ".*total number of created files now is.*"
+    });
+
+    public int checkCliDriverResults(String tname) throws Exception {
+        assert (qMap.containsKey(tname));
+
+        String outFileExtension = getOutFileExtension(tname);
+        String outFileName = outPath(outDir, tname + outFileExtension);
+
+        File f = new File(logDir, tname + outFileExtension);
+
+        maskPatterns(planMask, f.getPath());
+        int exitVal = executeDiffCommand(f.getPath(),
+                outFileName, false,
+                qSortSet.contains(tname));
+
+        if (exitVal != 0 && overWrite) {
+            exitVal = overwriteResults(f.getPath(), outFileName);
+        }
+
+        return exitVal;
+    }
+
+
+    public int checkCompareCliDriverResults(String tname, List<String> 
outputs) throws Exception {
+        assert outputs.size() > 1;
+        maskPatterns(planMask, outputs.get(0));
+        for (int i = 1; i < outputs.size(); ++i) {
+            maskPatterns(planMask, outputs.get(i));
+            int ecode = executeDiffCommand(
+                    outputs.get(i - 1), outputs.get(i), false, 
qSortSet.contains(tname));
+            if (ecode != 0) {
+                LOG.info("Files don't match: " + outputs.get(i - 1) + " and " 
+ outputs.get(i));
+                return ecode;
+            }
+        }
+        return 0;
+    }
+
+    private static int overwriteResults(String inFileName, String outFileName) 
throws Exception {
+        // This method can be replaced with Files.copy(source, target, 
REPLACE_EXISTING)
+        // once Hive uses JAVA 7.
+        LOG.info("Overwriting results " + inFileName + " to " + outFileName);
+        return executeCmd(new String[]{
+                "cp",
+                getQuotedString(inFileName),
+                getQuotedString(outFileName)
+        });
+    }
+
+    private static int executeDiffCommand(String inFileName,
+                                          String outFileName,
+                                          boolean ignoreWhiteSpace,
+                                          boolean sortResults
+    ) throws Exception {
+
+        int result = 0;
+
+        if (sortResults) {
+            // sort will try to open the output file in write mode on windows. 
We need to
+            // close it first.
+            SessionState ss = SessionState.get();
+            if (ss != null && ss.out != null && ss.out != System.out) {
+                ss.out.close();
+            }
+
+            String inSorted = inFileName + SORT_SUFFIX;
+            String outSorted = outFileName + SORT_SUFFIX;
+
+            result = sortFiles(inFileName, inSorted);
+            result |= sortFiles(outFileName, outSorted);
+            if (result != 0) {
+                LOG.error("ERROR: Could not sort files before comparing");
+                return result;
+            }
+            inFileName = inSorted;
+            outFileName = outSorted;
+        }
+
+        ArrayList<String> diffCommandArgs = new ArrayList<String>();
+        diffCommandArgs.add("diff");
+
+        // Text file comparison
+        diffCommandArgs.add("-a");
+
+        // Ignore changes in the amount of white space
+        if (ignoreWhiteSpace || Shell.WINDOWS) {
+            diffCommandArgs.add("-b");
+        }
+
+        // Files created on Windows machines have different line endings
+        // than files created on Unix/Linux. Windows uses carriage return and 
line feed
+        // ("\r\n") as a line ending, whereas Unix uses just line feed ("\n").
+        // Also StringBuilder.toString(), Stream to String conversions adds 
extra
+        // spaces at the end of the line.
+        if (Shell.WINDOWS) {
+            diffCommandArgs.add("--strip-trailing-cr"); // Strip trailing 
carriage return on input
+            diffCommandArgs.add("-B"); // Ignore changes whose lines are all 
blank
+        }
+        // Add files to compare to the arguments list
+        diffCommandArgs.add(getQuotedString(inFileName));
+        diffCommandArgs.add(getQuotedString(outFileName));
+
+        result = executeCmd(diffCommandArgs);
+
+        if (sortResults) {
+            new File(inFileName).delete();
+            new File(outFileName).delete();
+        }
+
+        return result;
+    }
+
+    private static int sortFiles(String in, String out) throws Exception {
+        return executeCmd(new String[]{
+                "sort",
+                getQuotedString(in),
+        }, out, null);
+    }
+
+    private static int executeCmd(Collection<String> args) throws Exception {
+        return executeCmd(args, null, null);
+    }
+
+    private static int executeCmd(String[] args) throws Exception {
+        return executeCmd(args, null, null);
+    }
+
+    private static int executeCmd(Collection<String> args, String outFile, 
String errFile) throws
+            Exception {
+        String[] cmdArray = args.toArray(new String[args.size()]);
+        return executeCmd(cmdArray, outFile, errFile);
+    }
+
+    private static int executeCmd(String[] args, String outFile, String 
errFile) throws Exception {
+        LOG.info("Running: " + org.apache.commons.lang.StringUtils.join(args, 
' '));
+
+        PrintStream out = outFile == null ?
+                SessionState.getConsole().getChildOutStream() :
+                new PrintStream(new FileOutputStream(outFile), true);
+        PrintStream err = errFile == null ?
+                SessionState.getConsole().getChildErrStream() :
+                new PrintStream(new FileOutputStream(errFile), true);
+
+        Process executor = Runtime.getRuntime().exec(args);
+
+        StreamPrinter errPrinter = new 
StreamPrinter(executor.getErrorStream(), null, err);
+        StreamPrinter outPrinter = new 
StreamPrinter(executor.getInputStream(), null, out);
+
+        outPrinter.start();
+        errPrinter.start();
+
+        int result = executor.waitFor();
+
+        outPrinter.join();
+        errPrinter.join();
+
+        if (outFile != null) {
+            out.close();
+        }
+
+        if (errFile != null) {
+            err.close();
+        }
+
+        return result;
+    }
+
+    private static String getQuotedString(String str) {
+        return Shell.WINDOWS ? String.format("\"%s\"", str) : str;
+    }
+
+    public ASTNode parseQuery(String tname) throws Exception {
+        return pd.parse(qMap.get(tname));
+    }
+
+    public void resetParser() throws SemanticException {
+        pd = new ParseDriver();
+        sem = new SemanticAnalyzer(conf);
+    }
+
+    public TreeMap<String, String> getQMap() {
+        return qMap;
+    }
+
+    /**
+     * HiveTestSetup defines test fixtures which are reused across testcases,
+     * and are needed before any test can be run
+     */
+    public static class HiveTestSetup {
+        private MiniZooKeeperCluster zooKeeperCluster = null;
+        private int zkPort;
+        private ZooKeeper zooKeeper;
+
+        public HiveTestSetup() {
+        }
+
+        public void preTest(HiveConf conf) throws Exception {
+
+            if (zooKeeperCluster == null) {
+                //create temp dir
+                String tmpBaseDir = System.getProperty("test.tmp.dir");
+                File tmpDir = Utilities.createTempDir(tmpBaseDir);
+
+                zooKeeperCluster = new MiniZooKeeperCluster();
+                zkPort = zooKeeperCluster.startup(tmpDir);
+            }
+
+            if (zooKeeper != null) {
+                zooKeeper.close();
+            }
+
+            int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars
+                    .HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+            zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, 
new Watcher() {
+                @Override
+                public void process(WatchedEvent arg0) {
+                }
+            });
+
+            String zkServer = "localhost";
+            conf.set("hive.zookeeper.quorum", zkServer);
+            conf.set("hive.zookeeper.client.port", "" + zkPort);
+        }
+
+        public void postTest(HiveConf conf) throws Exception {
+            if (zooKeeperCluster == null) {
+                return;
+            }
+
+            if (zooKeeper != null) {
+                zooKeeper.close();
+            }
+
+            ZooKeeperHiveLockManager.releaseAllLocks(conf);
+        }
+
+        public void tearDown() throws Exception {
+            if (zooKeeperCluster != null) {
+                zooKeeperCluster.shutdown();
+                zooKeeperCluster = null;
+            }
+        }
+    }
+
+    /**
+     * QTRunner: Runnable class for running a a single query file.
+     **/
+    public static class HiveTestRunner implements Runnable {
+        private final HiveTestUtil qt;
+        private final String fname;
+
+        public HiveTestRunner(HiveTestUtil qt, String fname) {
+            this.qt = qt;
+            this.fname = fname;
+        }
+
+        @Override
+        public void run() {
+            try {
+                // assumption is that environment has already been cleaned 
once globally
+                // hence each thread does not call cleanUp() and 
createSources() again
+                qt.cliInit(fname, false);
+                qt.executeClient(fname);
+            } catch (Throwable e) {
+                LOG.error("Query file " + fname + " failed with exception ", 
e);
+                e.printStackTrace();
+                outputTestFailureHelpMessage();
+            }
+        }
+    }
+
+    /**
+     * Setup to execute a set of query files. Uses HiveTestUtil to do so.
+     *
+     * @param qfiles array of input query files containing arbitrary number of 
hive
+     *               queries
+     * @param resDir output directory
+     * @param logDir log directory
+     * @return one HiveTestUtil for each query file
+     */
+    public static HiveTestUtil[] queryListRunnerSetup(File[] qfiles, String 
resDir,
+                                                      String logDir) throws 
Exception {
+        HiveTestUtil[] qt = new HiveTestUtil[qfiles.length];
+        for (int i = 0; i < qfiles.length; i++) {
+            qt[i] = new HiveTestUtil(resDir, logDir, MiniClusterType.mr, null, 
"0.20");
+            qt[i].addFile(qfiles[i]);
+            qt[i].clearTestSideEffects();
+        }
+
+        return qt;
+    }
+
+    /**
+     * Executes a set of query files in sequence.
+     *
+     * @param qfiles array of input query files containing arbitrary number of 
hive
+     *               queries
+     * @param qt     array of HiveTestUtils, one per qfile
+     * @return true if all queries passed, false otw
+     */
+    public static boolean queryListRunnerSingleThreaded(File[] qfiles, 
HiveTestUtil[] qt)
+            throws Exception {
+        boolean failed = false;
+        qt[0].cleanUp();
+        qt[0].createSources();
+        for (int i = 0; i < qfiles.length && !failed; i++) {
+            qt[i].clearTestSideEffects();
+            qt[i].cliInit(qfiles[i].getName(), false);
+            qt[i].executeClient(qfiles[i].getName());
+            int ecode = qt[i].checkCliDriverResults(qfiles[i].getName());
+            if (ecode != 0) {
+                failed = true;
+                LOG.error("Test " + qfiles[i].getName()
+                        + " results check failed with error code " + ecode);
+                outputTestFailureHelpMessage();
+            }
+            qt[i].clearPostTestEffects();
+        }
+        return (!failed);
+    }
+
+    public static void outputTestFailureHelpMessage() {
+        LOG.error("See ./ql/target/tmp/log/hive.log or 
./itests/qtest/target/tmp/log/hive.log, "
+                + "or check ./ql/target/surefire-reports or " +
+                "./itests/qtest/target/surefire-reports/ for specific test 
cases logs.");
+    }
+
+    public static String ensurePathEndsInSlash(String path) {
+        if (path == null) {
+            throw new NullPointerException("Path cannot be null");
+        }
+        if (path.endsWith(File.separator)) {
+            return path;
+        } else {
+            return path + File.separator;
+        }
+    }
+
+    private static String[] cachedQvFileList = null;
+    private static ImmutableList<String> cachedDefaultQvFileList = null;
+    private static Pattern qvSuffix = Pattern.compile("_[0-9]+.qv$", 
Pattern.CASE_INSENSITIVE);
+
+    public static List<String> getVersionFiles(String queryDir, String tname) {
+        ensureQvFileList(queryDir);
+        List<String> result = getVersionFilesInternal(tname);
+        if (result == null) {
+            result = cachedDefaultQvFileList;
+        }
+        return result;
+    }
+
+    private static void ensureQvFileList(String queryDir) {
+        if (cachedQvFileList != null) return;
+        // Not thread-safe.
+        LOG.info("Getting versions from " + queryDir);
+        cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return name.toLowerCase().endsWith(".qv");
+            }
+        });
+        if (cachedQvFileList == null) return; // no files at all
+        Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER);
+        List<String> defaults = getVersionFilesInternal("default");
+        cachedDefaultQvFileList = (defaults != null)
+                ? ImmutableList.copyOf(defaults) : ImmutableList.<String>of();
+    }
+
+    private static List<String> getVersionFilesInternal(String tname) {
+        if (cachedQvFileList == null) {
+            return new ArrayList<String>();
+        }
+        int pos = Arrays.binarySearch(cachedQvFileList, tname, 
String.CASE_INSENSITIVE_ORDER);
+        if (pos >= 0) {
+            throw new BuildException("Unexpected file list element: " + 
cachedQvFileList[pos]);
+        }
+        List<String> result = null;
+        for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) {
+            String candidate = cachedQvFileList[pos];
+            if (candidate.length() <= tname.length()
+                    || !tname.equalsIgnoreCase(candidate.substring(0, 
tname.length()))
+                    || 
!qvSuffix.matcher(candidate.substring(tname.length())).matches()) {
+                break;
+            }
+            if (result == null) {
+                result = new ArrayList<String>();
+            }
+            result.add(candidate);
+        }
+        return result;
+    }
+
+    public void failed(int ecode, String fname, String debugHint) {
+        String command = SessionState.get() != null ? 
SessionState.get().getLastCommand() : null;
+        Assert.fail("Client Execution failed with error code = " + ecode +
+                (command != null ? " running " + command : "") + (debugHint != 
null ? debugHint :
+                ""));
+    }
+
+    // for negative tests, which is succeeded.. no need to print the query 
string
+    public void failed(String fname, String debugHint) {
+        Assert.fail("Client Execution was expected to fail, but succeeded with 
error code 0 " +
+                (debugHint != null ? debugHint : ""));
+    }
+
+    public void failedDiff(int ecode, String fname, String debugHint) {
+        Assert.fail("Client Execution results failed with error code = " + 
ecode +
+                (debugHint != null ? debugHint : ""));
+    }
+
+    public void failed(Throwable e, String fname, String debugHint) {
+        String command = SessionState.get() != null ? 
SessionState.get().getLastCommand() : null;
+        LOG.error("Exception: ", e);
+        e.printStackTrace();
+        LOG.error("Failed query: " + fname);
+        Assert.fail("Unexpected exception " +
+                org.apache.hadoop.util.StringUtils.stringifyException(e) + 
"\n" +
+                (command != null ? " running " + command : "") +
+                (debugHint != null ? debugHint : ""));
+    }
+
+    public static class WindowsPathUtil {
+
+        public static void convertPathsFromWindowsToHdfs(HiveConf conf) {
+            // Following local paths are used as HDFS paths in unit tests.
+            // It works well in Unix as the path notation in Unix and HDFS is 
more or less same.
+            // But when it comes to Windows, drive letter separator ':' & 
backslash '\" are invalid
+            // characters in HDFS so we need to converts these local paths to 
HDFS paths before
+            // using them
+            // in unit tests.
+
+            String orgWarehouseDir = 
conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+            conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, 
getHdfsUriString(orgWarehouseDir));
+
+            String orgTestTempDir = System.getProperty("test.tmp.dir");
+            System.setProperty("test.tmp.dir", 
getHdfsUriString(orgTestTempDir));
+
+            String orgTestWarehouseDir = 
System.getProperty("test.warehouse.dir");
+            System.setProperty("test.warehouse.dir", 
getHdfsUriString(orgTestWarehouseDir));
+
+            String orgScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR);
+            conf.setVar(HiveConf.ConfVars.SCRATCHDIR, 
getHdfsUriString(orgScratchDir));
+        }
+
+        public static String getHdfsUriString(String uriStr) {
+            assert uriStr != null;
+            if (Shell.WINDOWS) {
+                // If the URI conversion is from Windows to HDFS then replace 
the '\' with '/'
+                // and remove the windows single drive letter & colon from 
absolute path.
+                return uriStr.replace('\\', '/')
+                        .replaceFirst("/[c-zC-Z]:", "/")
+                        .replaceFirst("^[c-zC-Z]:", "");
+            }
+            return uriStr;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
new file mode 100644
index 0000000..d920517
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.hive.util.PhoenixUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation for notification methods which are invoked as part of 
transactions against the
+ * hive metastore,allowing Phoenix metadata to be kept in sync with 
Hive'smetastore.
+ */
+public class PhoenixMetaHook implements HiveMetaHook {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixMetaHook.class);
+
+    @Override
+    public void preCreateTable(Table table) throws MetaException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Precreate  table : " + table.getTableName());
+        }
+
+        try (Connection conn = PhoenixConnectionUtil.getConnection(table)) {
+            String tableType = table.getTableType();
+            String tableName = 
PhoenixStorageHandlerUtil.getTargetTableName(table);
+
+            if (TableType.EXTERNAL_TABLE.name().equals(tableType)) {
+                // Check whether phoenix table exists.
+                if (!PhoenixUtil.existTable(conn, tableName)) {
+                    // Error if phoenix table not exist.
+                    throw new MetaException("Phoenix table " + tableName + " 
doesn't exist");
+                }
+            } else if (TableType.MANAGED_TABLE.name().equals(tableType)) {
+                // Check whether phoenix table exists.
+                if (PhoenixUtil.existTable(conn, tableName)) {
+                    // Error if phoenix table already exist.
+                    throw new MetaException("Phoenix table " + tableName + " 
already exist.");
+                }
+
+                PhoenixUtil.createTable(conn, createTableStatement(table));
+            } else {
+                throw new MetaException("Unsupported table Type: " + 
table.getTableType());
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Phoenix table " + tableName + " was created");
+            }
+        } catch (SQLException e) {
+            throw new MetaException(e.getMessage());
+        }
+    }
+
+    private String createTableStatement(Table table) throws MetaException {
+        Map<String, String> tableParameterMap = table.getParameters();
+
+        String tableName = PhoenixStorageHandlerUtil.getTargetTableName(table);
+        StringBuilder ddl = new StringBuilder("create table 
").append(tableName).append(" (\n");
+
+        String phoenixRowKeys = 
tableParameterMap.get(PhoenixStorageHandlerConstants
+                .PHOENIX_ROWKEYS);
+        StringBuilder realRowKeys = new StringBuilder();
+        List<String> phoenixRowKeyList = Lists.newArrayList(Splitter.on
+                
(PhoenixStorageHandlerConstants.COMMA).trimResults().split(phoenixRowKeys));
+        Map<String, String> columnMappingMap = 
getColumnMappingMap(tableParameterMap.get
+                (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING));
+
+        List<FieldSchema> fieldSchemaList = table.getSd().getCols();
+        for (int i = 0, limit = fieldSchemaList.size(); i < limit; i++) {
+            FieldSchema fieldSchema = fieldSchemaList.get(i);
+            String fieldName = fieldSchema.getName();
+            String fieldType = fieldSchema.getType();
+            String columnType = PhoenixUtil.getPhoenixType(fieldType);
+
+            String rowKeyName = getRowKeyMapping(fieldName, phoenixRowKeyList);
+            if (rowKeyName != null) {
+                // In case of RowKey
+                if ("binary".equals(columnType)) {
+                    // Phoenix must define max length of binary when type 
definition. Obtaining
+                    // information from the column mapping. ex) 
phoenix.rowkeys = "r1, r2(100), ..."
+                    List<String> tokenList = 
Lists.newArrayList(Splitter.on(CharMatcher.is('(')
+                            
.or(CharMatcher.is(')'))).trimResults().split(rowKeyName));
+                    columnType = columnType + "(" + tokenList.get(1) + ")";
+                    rowKeyName = tokenList.get(0);
+                }
+
+                ddl.append("  ").append(rowKeyName).append(" 
").append(columnType).append(" not " +
+                        "null,\n");
+                realRowKeys.append(rowKeyName).append(",");
+            } else {
+                // In case of Column
+                String columnName = columnMappingMap.get(fieldName);
+
+                if (columnName == null) {
+                    // Use field definition.
+                    columnName = fieldName;
+                }
+
+                if ("binary".equals(columnType)) {
+                    // Phoenix must define max length of binary when type 
definition. Obtaining
+                    // information from the column mapping. ex) 
phoenix.column.mapping=c1:c1(100)
+                    List<String> tokenList = 
Lists.newArrayList(Splitter.on(CharMatcher.is('(')
+                            
.or(CharMatcher.is(')'))).trimResults().split(columnName));
+                    columnType = columnType + "(" + tokenList.get(1) + ")";
+                    columnName = tokenList.get(0);
+                }
+
+                ddl.append("  ").append(columnName).append(" 
").append(columnType).append(",\n");
+            }
+        }
+        ddl.append("  ").append("constraint pk_").append(tableName).append(" 
primary key(")
+                .append(realRowKeys.deleteCharAt(realRowKeys.length() - 
1)).append(")\n)\n");
+
+        String tableOptions = 
tableParameterMap.get(PhoenixStorageHandlerConstants
+                .PHOENIX_TABLE_OPTIONS);
+        if (tableOptions != null) {
+            ddl.append(tableOptions);
+        }
+
+        String statement = ddl.toString();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("DDL : " + statement);
+        }
+
+        return statement;
+    }
+
+    private String getRowKeyMapping(String rowKeyName, List<String> 
phoenixRowKeyList) {
+        String rowKeyMapping = null;
+
+        for (String phoenixRowKey : phoenixRowKeyList) {
+            if (phoenixRowKey.equals(rowKeyName)) {
+                rowKeyMapping = phoenixRowKey;
+                break;
+            } else if (phoenixRowKey.startsWith(rowKeyName + "(") && 
phoenixRowKey.endsWith(")")) {
+                rowKeyMapping = phoenixRowKey;
+                break;
+            }
+        }
+
+        return rowKeyMapping;
+    }
+
+    private Map<String, String> getColumnMappingMap(String columnMappings) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Column mappings : " + columnMappings);
+        }
+
+        if (columnMappings == null) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("phoenix.column.mapping not set. using field 
definition");
+            }
+
+            return Collections.emptyMap();
+        }
+
+        Map<String, String> columnMappingMap = 
Splitter.on(PhoenixStorageHandlerConstants.COMMA)
+                
.trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split
+                        (columnMappings);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Column mapping map : " + columnMappingMap);
+        }
+
+        return columnMappingMap;
+    }
+
+    @Override
+    public void rollbackCreateTable(Table table) throws MetaException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Rollback for table : " + table.getTableName());
+        }
+
+        dropTableIfExist(table);
+    }
+
+    @Override
+    public void commitCreateTable(Table table) throws MetaException {
+
+    }
+
+    @Override
+    public void preDropTable(Table table) throws MetaException {
+    }
+
+    @Override
+    public void rollbackDropTable(Table table) throws MetaException {
+    }
+
+    @Override
+    public void commitDropTable(Table table, boolean deleteData) throws 
MetaException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Commit drop table : " + table.getTableName());
+        }
+
+        dropTableIfExist(table);
+    }
+
+    private void dropTableIfExist(Table table) throws MetaException {
+        try (Connection conn = PhoenixConnectionUtil.getConnection(table)) {
+            String tableType = table.getTableType();
+            String tableName = 
PhoenixStorageHandlerUtil.getTargetTableName(table);
+
+            if (TableType.MANAGED_TABLE.name().equals(tableType)) {
+                // Drop if phoenix table exist.
+                if (PhoenixUtil.existTable(conn, tableName)) {
+                    PhoenixUtil.dropTable(conn, tableName);
+                }
+            }
+        } catch (SQLException e) {
+            throw new MetaException(e.getMessage());
+        }
+    }
+}

Reply via email to