PHOENIX-3346 Hive PhoenixStorageHandler doesn't work well with column
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1ae7177d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1ae7177d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1ae7177d Branch: refs/heads/4.x-HBase-1.3 Commit: 1ae7177d8f7cf3928d71cf127e79f1880e10f0f7 Parents: 69d6aa0 Author: Sergey Soldatov <s...@apache.org> Authored: Wed Mar 1 11:51:46 2017 -0800 Committer: Sergey Soldatov <s...@apache.org> Committed: Wed Mar 1 11:57:35 2017 -0800 ---------------------------------------------------------------------- phoenix-hive/pom.xml | 13 + .../phoenix/hive/BaseHivePhoenixStoreIT.java | 165 ++++++++++ .../apache/phoenix/hive/HiveMapReduceIT.java | 32 ++ .../apache/phoenix/hive/HivePhoenixStoreIT.java | 330 ++++++++++--------- .../org/apache/phoenix/hive/HiveTestUtil.java | 22 +- .../java/org/apache/phoenix/hive/HiveTezIT.java | 32 ++ .../apache/phoenix/hive/PhoenixMetaHook.java | 37 +-- .../org/apache/phoenix/hive/PhoenixSerDe.java | 9 +- .../apache/phoenix/hive/PhoenixSerializer.java | 4 + .../phoenix/hive/PhoenixStorageHandler.java | 5 + .../hive/mapreduce/PhoenixInputFormat.java | 3 +- .../hive/mapreduce/PhoenixRecordReader.java | 1 + .../hive/mapreduce/PhoenixResultWritable.java | 12 +- .../phoenix/hive/query/PhoenixQueryBuilder.java | 76 ++++- .../phoenix/hive/util/ColumnMappingUtils.java | 76 +++++ .../hive/util/PhoenixConnectionUtil.java | 19 ++ .../hive/query/PhoenixQueryBuilderTest.java | 10 +- 17 files changed, 604 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-hive/pom.xml b/phoenix-hive/pom.xml index 2d0ef2b..d6ccdfe 100644 --- a/phoenix-hive/pom.xml +++ b/phoenix-hive/pom.xml @@ -110,6 +110,19 @@ <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + <version>0.8.4</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <scope>test</scope> + <version>0.8.4</version> + </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java new file mode 100644 index 0000000..ac0a7fc --- /dev/null +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.AfterClass; + +import java.io.File; +import java.io.IOException; +import java.sql.*; +import java.util.Properties; + +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Base class for all Hive Phoenix integration tests that may be run with Tez or MR mini cluster + */ +public class BaseHivePhoenixStoreIT { + + private static final Log LOG = LogFactory.getLog(BaseHivePhoenixStoreIT.class); + protected static HBaseTestingUtility hbaseTestUtil; + protected static MiniHBaseCluster hbaseCluster; + private static String zkQuorum; + protected static Connection conn; + private static Configuration conf; + protected static HiveTestUtil qt; + protected static String hiveOutputDir; + protected static String hiveLogDir; + + + public static void setup(HiveTestUtil.MiniClusterType clusterType)throws Exception { + String hadoopConfDir = System.getenv("HADOOP_CONF_DIR"); + if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) { + LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which may cause " + + "issues with test execution via MiniDFSCluster"); + } + hbaseTestUtil = new HBaseTestingUtility(); + conf = hbaseTestUtil.getConfiguration(); + setUpConfigForMiniCluster(conf); + conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_output").toString(); + File outputDir = new File(hiveOutputDir); + outputDir.mkdirs(); + hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_log").toString(); + File logDir = new File(hiveLogDir); + logDir.mkdirs(); + // Setup Hive mini Server + Path testRoot = hbaseTestUtil.getDataTestDir(); + System.setProperty("test.tmp.dir", testRoot.toString()); + System.setProperty("test.warehouse.dir", (new Path(testRoot, "warehouse")).toString()); + + try { + qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, clusterType, null); + } catch (Exception e) { + LOG.error("Unexpected exception in setup", e); + fail("Unexpected exception in setup"); + } + + //Start HBase cluster + hbaseCluster = hbaseTestUtil.startMiniCluster(3); + MiniDFSCluster x = hbaseTestUtil.getDFSCluster(); + Class.forName(PhoenixDriver.class.getName()); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props); + // Setup Hive Output Folder + + Statement stmt = conn.createStatement(); + stmt.execute("create table t(a integer primary key,b varchar)"); + } + + protected void runTest(String fname, String fpath) throws Exception { + long startTime = System.currentTimeMillis(); + try { + LOG.info("Begin query: " + fname); + qt.addFile(fpath); + + if (qt.shouldBeSkipped(fname)) { + LOG.info("Test " + fname + " skipped"); + return; + } + + qt.cliInit(fname); + qt.clearTestSideEffects(); + int ecode = qt.executeClient(fname); + if (ecode != 0) { + qt.failed(ecode, fname, null); + return; + } + + ecode = qt.checkCliDriverResults(fname); + if (ecode != 0) { + qt.failedDiff(ecode, fname, null); + } + qt.clearPostTestEffects(); + + } catch (Throwable e) { + qt.failed(e, fname, null); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s"); + assertTrue("Test passed", true); + } + + protected void createFile(String content, String fullName) throws IOException { + FileUtils.write(new File(fullName), content); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (qt != null) { + try { + qt.shutdown(); + } catch (Exception e) { + LOG.error("Unexpected exception in setup", e); + fail("Unexpected exception in tearDown"); + } + } + try { + conn.close(); + } finally { + try { + PhoenixDriver.INSTANCE.close(); + } finally { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java new file mode 100644 index 0000000..7203597 --- /dev/null +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hive; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class HiveMapReduceIT extends HivePhoenixStoreIT { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setup(HiveTestUtil.MiniClusterType.mr); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java index a707a06..cf12a80 100644 --- a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java @@ -17,99 +17,22 @@ */ package org.apache.phoenix.hive; -import org.apache.commons.io.FileUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; -import org.junit.experimental.categories.Category; -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Properties; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test class to run all Hive Phoenix integration tests against a MINI Map-Reduce cluster. + * Test methods only. All supporting methods should be placed to BaseHivePhoenixStoreIT */ -@Category(NeedsOwnMiniClusterTest.class) -public class HivePhoenixStoreIT { - private static final Log LOG = LogFactory.getLog(HivePhoenixStoreIT.class); - private static HBaseTestingUtility hbaseTestUtil; - private static String zkQuorum; - private static Connection conn; - private static Configuration conf; - private static HiveTestUtil qt; - private static String hiveOutputDir; - private static String hiveLogDir; - - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - String hadoopConfDir = System.getenv("HADOOP_CONF_DIR"); - if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) { - LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which may cause " - + "issues with test execution via MiniDFSCluster"); - } - hbaseTestUtil = new HBaseTestingUtility(); - conf = hbaseTestUtil.getConfiguration(); - setUpConfigForMiniCluster(conf); - conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_output").toString(); - File outputDir = new File(hiveOutputDir); - outputDir.mkdirs(); - hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_log").toString(); - File logDir = new File(hiveLogDir); - logDir.mkdirs(); - // Setup Hive mini Server - Path testRoot = hbaseTestUtil.getDataTestDir(); - System.setProperty("test.tmp.dir", testRoot.toString()); - System.setProperty("test.warehouse.dir", (new Path(testRoot, "warehouse")).toString()); - - HiveTestUtil.MiniClusterType miniMR = HiveTestUtil.MiniClusterType.mr; - try { - qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, miniMR, null); - } catch (Exception e) { - LOG.error("Unexpected exception in setup", e); - fail("Unexpected exception in setup"); - } - - //Start HBase cluster - hbaseTestUtil.startMiniCluster(3); - MiniDFSCluster x = hbaseTestUtil.getDFSCluster(); - - Class.forName(PhoenixDriver.class.getName()); - zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); - Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); - props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + - PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props); - // Setup Hive Output Folder - - Statement stmt = conn.createStatement(); - stmt.execute("create table t(a integer primary key,b varchar)"); - } +@Ignore("This class contains only test methods and should not be executed directly") +public class HivePhoenixStoreIT extends BaseHivePhoenixStoreIT { /** * Create a table with two column, insert 1 row, check that phoenix table is created and @@ -120,7 +43,6 @@ public class HivePhoenixStoreIT { @Test public void simpleTest() throws Exception { String testName = "simpleTest"; - // create a dummy outfile under log folder hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); @@ -129,9 +51,11 @@ public class HivePhoenixStoreIT { " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + " 'phoenix.table.name'='phoenix_table'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() - .getClientPort() + "', 'phoenix.rowkeys'='id');"); + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id');"); sb.append("INSERT INTO TABLE phoenix_table" + HiveTestUtil.CRLF + "VALUES ('10', '1000');" + HiveTestUtil.CRLF); String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); @@ -145,9 +69,48 @@ public class HivePhoenixStoreIT { assertTrue(rs.next()); assert (rs.getString(1).equals("10")); assert (rs.getString(2).equals("1000")); + } + + /** + * Create hive table with custom column mapping + * @throws Exception + */ + + @Test + public void simpleColumnMapTest() throws Exception { + String testName = "cmTest"; + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); + createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE column_table(ID STRING, P1 STRING, p2 STRING)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.table.name'='column_table'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.column.mapping' = 'id:C1, p1:c2, p2:C3'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id');"); + sb.append("INSERT INTO TABLE column_table" + HiveTestUtil.CRLF + + "VALUES ('1', '2', '3');" + HiveTestUtil.CRLF); + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + + String phoenixQuery = "SELECT C1, \"c2\", C3 FROM column_table"; + PreparedStatement statement = conn.prepareStatement(phoenixQuery); + ResultSet rs = statement.executeQuery(); + assert (rs.getMetaData().getColumnCount() == 3); + assertTrue(rs.next()); + assert (rs.getString(1).equals("1")); + assert (rs.getString(2).equals("2")); + assert (rs.getString(3).equals("3")); } + /** * Datatype Test * @@ -156,22 +119,22 @@ public class HivePhoenixStoreIT { @Test public void dataTypeTest() throws Exception { String testName = "dataTypeTest"; - // create a dummy outfile under log folder hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); StringBuilder sb = new StringBuilder(); - sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, ts TIMESTAMP, db " + + sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, ts TIMESTAMP, db " + "DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + " 'phoenix.hbase.table.name'='phoenix_datatype'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() - .getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + " 'phoenix.rowkeys'='id');"); sb.append("INSERT INTO TABLE phoenix_datatype" + HiveTestUtil.CRLF + - "VALUES (10, \"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" + HiveTestUtil.CRLF); + "VALUES (10, \"foodesc\", \"2013-01-05 01:01:01\", 200,2.0,-1);" + HiveTestUtil.CRLF); String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); createFile(sb.toString(), fullPath); runTest(testName, fullPath); @@ -183,10 +146,6 @@ public class HivePhoenixStoreIT { while (rs.next()) { assert (rs.getInt(1) == 10); assert (rs.getString(2).equalsIgnoreCase("foodesc")); - /* Need a way how to correctly handle timestamp since Hive's implementation uses - time zone information but Phoenix doesn't. - */ - //assert(rs.getTimestamp(3).equals(Timestamp.valueOf("2013-01-05 02:01:01"))); assert (rs.getDouble(4) == 200); assert (rs.getFloat(5) == 2.0); assert (rs.getInt(6) == -1); @@ -201,23 +160,22 @@ public class HivePhoenixStoreIT { @Test public void MultiKey() throws Exception { String testName = "MultiKey"; - // create a dummy outfile under log folder hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); StringBuilder sb = new StringBuilder(); - sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 String,description STRING, ts " + - "TIMESTAMP, db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 String,description STRING," + + "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + " 'phoenix.hbase.table.name'='phoenix_MultiKey'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + - " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() - .getClientPort() + "'," + HiveTestUtil.CRLF + - " 'phoenix.rowkeys'='id,id2');"); - sb.append("INSERT INTO TABLE phoenix_MultiKey" + HiveTestUtil.CRLF + - "VALUES (10, \"part2\",\"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF); + sb.append("INSERT INTO TABLE phoenix_MultiKey VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); createFile(sb.toString(), fullPath); @@ -226,78 +184,128 @@ public class HivePhoenixStoreIT { String phoenixQuery = "SELECT * FROM phoenix_MultiKey"; PreparedStatement statement = conn.prepareStatement(phoenixQuery); ResultSet rs = statement.executeQuery(); - assert (rs.getMetaData().getColumnCount() == 7); + assert (rs.getMetaData().getColumnCount() == 6); while (rs.next()) { assert (rs.getInt(1) == 10); assert (rs.getString(2).equalsIgnoreCase("part2")); assert (rs.getString(3).equalsIgnoreCase("foodesc")); - //assert(rs.getTimestamp(4).equals(Timestamp.valueOf("2013-01-05 02:01:01"))); - assert (rs.getDouble(5) == 200); - assert (rs.getFloat(6) == 2.0); - assert (rs.getInt(7) == -1); + assert (rs.getDouble(4) == 200); + assert (rs.getFloat(5) == 2.0); + assert (rs.getInt(6) == -1); } } + /** + * Test that hive is able to access Phoenix data during MR job (creating two tables and perform join on it) + * + * @throws Exception + */ + @Test + public void testJoinNoColumnMaps() throws Exception { + String testName = "testJoin"; + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); + createFile("10\tpart2\tfoodesc\t200.0\t2.0\t-1\t10\tpart2\tfoodesc\t200.0\t2.0\t-1\n", + new Path(hiveOutputDir, testName + ".out").toString()); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE joinTable1(ID int, ID2 String,description STRING," + + "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='joinTable1'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF); + sb.append("CREATE TABLE joinTable2(ID int, ID2 String,description STRING," + + "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='joinTable2'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF); + + sb.append("INSERT INTO TABLE joinTable1 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); + sb.append("INSERT INTO TABLE joinTable1 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); - private void runTest(String fname, String fpath) throws Exception { - long startTime = System.currentTimeMillis(); - try { - LOG.info("Begin query: " + fname); - qt.addFile(fpath); + sb.append("INSERT INTO TABLE joinTable2 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); + sb.append("INSERT INTO TABLE joinTable2 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); - if (qt.shouldBeSkipped(fname)) { - LOG.info("Test " + fname + " skipped"); - return; - } + sb.append("SELECT * from joinTable1 A join joinTable2 B on A.ID = B.ID WHERE A.ID=10;" + + HiveTestUtil.CRLF); - qt.cliInit(fname); - qt.clearTestSideEffects(); - int ecode = qt.executeClient(fname); - if (ecode != 0) { - qt.failed(ecode, fname, null); - } + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + } + + /** + * Test that hive is able to access Phoenix data during MR job (creating two tables and perform join on it) + * + * @throws Exception + */ + @Test + public void testJoinColumnMaps() throws Exception { + String testName = "testJoin"; + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile("10\t200.0\tpart2\n", new Path(hiveOutputDir, testName + ".out").toString()); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); - ecode = qt.checkCliDriverResults(fname); - if (ecode != 0) { - qt.failedDiff(ecode, fname, null); - } - qt.clearPostTestEffects(); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE joinTable3(ID int, ID2 String,description STRING," + + "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='joinTable3'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.column.mapping' = 'id:i1, id2:I2'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF); + sb.append("CREATE TABLE joinTable4(ID int, ID2 String,description STRING," + + "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='joinTable4'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.client.port'='" + + hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.column.mapping' = 'id:i1, id2:I2'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF); - } catch (Throwable e) { - qt.failed(e, fname, null); - } + sb.append("INSERT INTO TABLE joinTable3 VALUES (5, \"part1\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); + sb.append("INSERT INTO TABLE joinTable3 VALUES (10, \"part1\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); - long elapsedTime = System.currentTimeMillis() - startTime; - LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s"); - assertTrue("Test passed", true); - } + sb.append("INSERT INTO TABLE joinTable4 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); + sb.append("INSERT INTO TABLE joinTable4 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF); - private void createFile(String content, String fullName) throws IOException { - FileUtils.write(new File(fullName), content); - } + sb.append("SELECT A.ID, a.db, B.ID2 from joinTable3 A join joinTable4 B on A.ID = B.ID WHERE A.ID=10;" + + HiveTestUtil.CRLF); - @AfterClass - public static void tearDownAfterClass() throws Exception { - if (qt != null) { - try { - qt.shutdown(); - } catch (Exception e) { - LOG.error("Unexpected exception in setup", e); - fail("Unexpected exception in tearDown"); - } - } - try { - conn.close(); - } finally { - try { - PhoenixDriver.INSTANCE.close(); - } finally { - try { - DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); - } finally { - hbaseTestUtil.shutdownMiniCluster(); - } - } + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + //Test that Phoenix has correctly mapped columns. We are checking both, primary key and + // regular columns mapped and not mapped + String phoenixQuery = "SELECT \"i1\", \"I2\", \"db\" FROM joinTable3 where \"i1\" = 10 AND \"I2\" = 'part1' AND \"db\" = 200"; + PreparedStatement statement = conn.prepareStatement(phoenixQuery); + ResultSet rs = statement.executeQuery(); + assert (rs.getMetaData().getColumnCount() == 3); + while (rs.next()) { + assert (rs.getInt(1) == 10); + assert (rs.getString(2).equalsIgnoreCase("part1")); + assert (rs.getDouble(3) == 200); } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java index 3407ffb..f5823ea 100644 --- a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java @@ -691,6 +691,7 @@ public class HiveTestUtil { } public int executeClient(String tname) { + conf.set("mapreduce.job.name", "test"); return cliDriver.processLine(getCommands(tname), false); } @@ -1110,27 +1111,6 @@ public class HiveTestUtil { } /** - * Setup to execute a set of query files. Uses HiveTestUtil to do so. - * - * @param qfiles array of input query files containing arbitrary number of hive - * queries - * @param resDir output directory - * @param logDir log directory - * @return one HiveTestUtil for each query file - */ - public static HiveTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, - String logDir) throws Exception { - HiveTestUtil[] qt = new HiveTestUtil[qfiles.length]; - for (int i = 0; i < qfiles.length; i++) { - qt[i] = new HiveTestUtil(resDir, logDir, MiniClusterType.mr, null, "0.20"); - qt[i].addFile(qfiles[i]); - qt[i].clearTestSideEffects(); - } - - return qt; - } - - /** * Executes a set of query files in sequence. * * @param qfiles array of input query files containing arbitrary number of hive http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java new file mode 100644 index 0000000..a675a0e --- /dev/null +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hive; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class HiveTezIT extends HivePhoenixStoreIT { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setup(HiveTestUtil.MiniClusterType.tez); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java index ae3675f..c35634a 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java @@ -35,9 +35,12 @@ import org.apache.phoenix.hive.util.PhoenixUtil; import java.sql.Connection; import java.sql.SQLException; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.apache.phoenix.hive.util.ColumnMappingUtils.getColumnMappingMap; + /** * Implementation for notification methods which are invoked as part of transactions against the * hive metastore,allowing Phoenix metadata to be kept in sync with Hive'smetastore. @@ -105,6 +108,10 @@ public class PhoenixMetaHook implements HiveMetaHook { String rowKeyName = getRowKeyMapping(fieldName, phoenixRowKeyList); if (rowKeyName != null) { + String columnName = columnMappingMap.get(fieldName); + if(columnName != null) { + rowKeyName = columnName; + } // In case of RowKey if ("binary".equals(columnType)) { // Phoenix must define max length of binary when type definition. Obtaining @@ -115,9 +122,9 @@ public class PhoenixMetaHook implements HiveMetaHook { rowKeyName = tokenList.get(0); } - ddl.append(" ").append(rowKeyName).append(" ").append(columnType).append(" not " + + ddl.append(" ").append("\"").append(rowKeyName).append("\"").append(" ").append(columnType).append(" not " + "null,\n"); - realRowKeys.append(rowKeyName).append(","); + realRowKeys.append("\"").append(rowKeyName).append("\","); } else { // In case of Column String columnName = columnMappingMap.get(fieldName); @@ -136,7 +143,7 @@ public class PhoenixMetaHook implements HiveMetaHook { columnName = tokenList.get(0); } - ddl.append(" ").append(columnName).append(" ").append(columnType).append(",\n"); + ddl.append(" ").append("\"").append(columnName).append("\"").append(" ").append(columnType).append(",\n"); } } ddl.append(" ").append("constraint pk_").append(PhoenixUtil.getTableSchema(tableName.toUpperCase())[1]).append(" primary key(") @@ -173,30 +180,6 @@ public class PhoenixMetaHook implements HiveMetaHook { return rowKeyMapping; } - private Map<String, String> getColumnMappingMap(String columnMappings) { - if (LOG.isDebugEnabled()) { - LOG.debug("Column mappings : " + columnMappings); - } - - if (columnMappings == null) { - if (LOG.isInfoEnabled()) { - LOG.info("phoenix.column.mapping not set. using field definition"); - } - - return Collections.emptyMap(); - } - - Map<String, String> columnMappingMap = Splitter.on(PhoenixStorageHandlerConstants.COMMA) - .trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split - (columnMappings); - - if (LOG.isDebugEnabled()) { - LOG.debug("Column mapping map : " + columnMappingMap); - } - - return columnMappingMap; - } - @Override public void rollbackCreateTable(Table table) throws MetaException { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java index dd38cfb..9ef0158 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java @@ -84,14 +84,7 @@ public class PhoenixSerDe extends AbstractSerDe { } serializer = new PhoenixSerializer(conf, tbl); - row = new PhoenixRow(Lists.transform(serdeParams.getColumnNames(), new Function<String, - String>() { - - @Override - public String apply(String input) { - return input.toUpperCase(); - } - })); + row = new PhoenixRow(serdeParams.getColumnNames()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java index e43ed0e..852407a 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java @@ -63,6 +63,10 @@ public class PhoenixSerializer { private PhoenixResultWritable pResultWritable; public PhoenixSerializer(Configuration config, Properties tbl) throws SerDeException { + String mapping = tbl.getProperty(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, null); + if(mapping!=null ) { + config.set(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, mapping); + } try (Connection conn = PhoenixConnectionUtil.getInputConnection(config, tbl)) { List<ColumnInfo> columnMetadata = PhoenixUtil.getColumnInfoList(conn, tbl.getProperty (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java index a425b7c..ae8f242 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java @@ -194,6 +194,11 @@ public class PhoenixStorageHandler extends DefaultStorageHandler implements jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, tableProperties .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PARENT)); + String columnMapping = tableProperties + .getProperty(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING); + if(columnMapping != null) { + jobProperties.put(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, columnMapping); + } jobProperties.put(hive_metastoreConstants.META_TABLE_STORAGE, this.getClass().getName()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 9ebc3d6..f0a5dd6 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -91,7 +91,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri String query; String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue()); - if (LOG.isDebugEnabled()) { LOG.debug("Target table name at split phase : " + tableName + "with whereCondition :" + jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR) + @@ -151,7 +150,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri setScanCacheSize(jobConf); // Adding Localization - HConnection connection = HConnectionManager.createConnection(jobConf); + HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)); RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan .getTableRef().getTable().getPhysicalName().toString())); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java index 5cdf234..ca27686 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java @@ -71,6 +71,7 @@ public class PhoenixRecordReader<T extends DBWritable> implements private PhoenixResultSet resultSet; private long readCount; + private boolean isTransactional; public PhoenixRecordReader(Class<T> inputClass, final Configuration configuration, final http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java index 18ded89..2bdc7b2 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.hive.PhoenixRowKey; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.util.ColumnMappingUtils; import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; import org.apache.phoenix.hive.util.PhoenixUtil; import org.apache.phoenix.util.ColumnInfo; @@ -52,6 +53,7 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable private List<ColumnInfo> columnMetadataList; private List<Object> valueList; // for output private Map<String, Object> rowMap = Maps.newHashMap(); // for input + private Map<String, String> columnMap; private int columnCount = -1; @@ -71,7 +73,6 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable throws IOException { this(config); this.columnMetadataList = columnMetadataList; - valueList = Lists.newArrayListWithExpectedSize(columnMetadataList.size()); } @@ -158,8 +159,12 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable for (int i = 0; i < columnCount; i++) { Object value = resultSet.getObject(i + 1); - - rowMap.put(rsmd.getColumnName(i + 1), value); + String columnName = rsmd.getColumnName(i + 1); + String mapName = columnMap.get(columnName); + if(mapName != null) { + columnName = mapName; + } + rowMap.put(columnName, value); } // Adding row__id column. @@ -195,6 +200,7 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable @Override public void setConf(Configuration conf) { config = conf; + this.columnMap = ColumnMappingUtils.getReverseColumnMapping(config.get(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING,"")); isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(config); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java index ebc5fc0..210a377 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java @@ -42,10 +42,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.mapred.JobConf; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.hive.ql.index.IndexSearchCondition; +import org.apache.phoenix.hive.util.ColumnMappingUtils; import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; import org.apache.phoenix.hive.util.PhoenixUtil; import org.apache.phoenix.util.StringUtil; +import static org.apache.phoenix.hive.util.ColumnMappingUtils.getColumnMappingMap; + /** * Query builder. Produces a query depending on the colummn list and conditions */ @@ -91,13 +94,16 @@ public class PhoenixQueryBuilder { TypeInfo> columnTypeMap) throws IOException { StringBuilder sql = new StringBuilder(); List<String> conditionColumnList = buildWhereClause(jobConf, sql, whereClause, columnTypeMap); + readColumnList = replaceColumns(jobConf, readColumnList); if (conditionColumnList.size() > 0) { addConditionColumnToReadColumn(readColumnList, conditionColumnList); + readColumnList = ColumnMappingUtils.quoteColumns(readColumnList); sql.insert(0, queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$", getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$", tableName)); } else { + readColumnList = ColumnMappingUtils.quoteColumns(readColumnList); sql.append(queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$", getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$", tableName)); @@ -110,18 +116,46 @@ public class PhoenixQueryBuilder { return sql.toString(); } + private static String findReplacement(JobConf jobConf, String column) { + Map<String, String> columnMappingMap = getColumnMappingMap(jobConf.get + (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING)); + if (columnMappingMap != null && columnMappingMap.containsKey(column)) { + return columnMappingMap.get(column); + } else { + return column; + } + } + private static List<String> replaceColumns(JobConf jobConf, List<String> columnList) { + Map<String, String> columnMappingMap = getColumnMappingMap(jobConf.get + (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING)); + if(columnMappingMap != null) { + List<String> newList = Lists.newArrayList(); + for(String column:columnList) { + if(columnMappingMap.containsKey(column)) { + newList.add(columnMappingMap.get(column)); + } else { + newList.add(column); + } + } + return newList; + } + return null; + } + private String makeQueryString(JobConf jobConf, String tableName, List<String> readColumnList, List<IndexSearchCondition> searchConditions, String queryTemplate, String hints) throws IOException { StringBuilder query = new StringBuilder(); - List<String> conditionColumnList = buildWhereClause(query, searchConditions); + List<String> conditionColumnList = buildWhereClause(jobConf, query, searchConditions); if (conditionColumnList.size() > 0) { + readColumnList = replaceColumns(jobConf, readColumnList); addConditionColumnToReadColumn(readColumnList, conditionColumnList); query.insert(0, queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$", getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$", tableName)); } else { + readColumnList = replaceColumns(jobConf, readColumnList); query.append(queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$", getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$", tableName)); @@ -136,7 +170,7 @@ public class PhoenixQueryBuilder { private String getSelectColumns(JobConf jobConf, String tableName, List<String> readColumnList) throws IOException { - String selectColumns = Joiner.on(PhoenixStorageHandlerConstants.COMMA).join(readColumnList); + String selectColumns = Joiner.on(PhoenixStorageHandlerConstants.COMMA).join(ColumnMappingUtils.quoteColumns(readColumnList)); if (PhoenixStorageHandlerConstants.EMPTY_STRING.equals(selectColumns)) { selectColumns = "*"; @@ -146,10 +180,8 @@ public class PhoenixQueryBuilder { StringBuilder pkColumns = new StringBuilder(); for (String pkColumn : pkColumnList) { - String pkColumnName = pkColumn.toLowerCase(); - - if (!readColumnList.contains(pkColumnName)) { - pkColumns.append(pkColumnName).append(PhoenixStorageHandlerConstants.COMMA); + if (!readColumnList.contains(pkColumn)) { + pkColumns.append("\"").append(pkColumn).append("\"" + PhoenixStorageHandlerConstants.COMMA); } } @@ -218,7 +250,10 @@ public class PhoenixQueryBuilder { for (String columnName : columnTypeMap.keySet()) { if (whereClause.contains(columnName)) { - conditionColumnList.add(columnName); + String column = findReplacement(jobConf, columnName); + whereClause = StringUtils.replaceEach(whereClause, new String[] {columnName}, new String[] {"\""+column + "\""}); + conditionColumnList.add(column); + if (PhoenixStorageHandlerConstants.DATE_TYPE.equals( columnTypeMap.get(columnName).getTypeName())) { @@ -617,7 +652,7 @@ public class PhoenixQueryBuilder { return itsMine; } - protected List<String> buildWhereClause(StringBuilder sql, + protected List<String> buildWhereClause(JobConf jobConf, StringBuilder sql, List<IndexSearchCondition> conditions) throws IOException { if (conditions == null || conditions.size() == 0) { @@ -628,21 +663,27 @@ public class PhoenixQueryBuilder { sql.append(" where "); Iterator<IndexSearchCondition> iter = conditions.iterator(); - appendExpression(sql, iter.next(), columns); + appendExpression(jobConf, sql, iter.next(), columns); while (iter.hasNext()) { sql.append(" and "); - appendExpression(sql, iter.next(), columns); + appendExpression(jobConf, sql, iter.next(), columns); } return columns; } - private void appendExpression(StringBuilder sql, IndexSearchCondition condition, + private void appendExpression(JobConf jobConf, StringBuilder sql, IndexSearchCondition condition, List<String> columns) { Expression expr = findExpression(condition); if (expr != null) { - sql.append(expr.buildExpressionStringFrom(condition)); - columns.add(condition.getColumnDesc().getColumn()); + sql.append(expr.buildExpressionStringFrom(jobConf, condition)); + String column = condition.getColumnDesc().getColumn(); + String rColumn = findReplacement(jobConf, column); + if(rColumn != null) { + column = rColumn; + } + + columns.add(column); } } @@ -719,10 +760,15 @@ public class PhoenixQueryBuilder { return condition.getComparisonOp().endsWith(hiveCompOp) && checkCondition(condition); } - public String buildExpressionStringFrom(IndexSearchCondition condition) { + public String buildExpressionStringFrom(JobConf jobConf, IndexSearchCondition condition) { final String type = condition.getColumnDesc().getTypeString(); + String column = condition.getColumnDesc().getColumn(); + String rColumn = findReplacement(jobConf, column); + if(rColumn != null) { + column = rColumn; + } return JOINER_SPACE.join( - condition.getColumnDesc().getColumn(), + "\"" + column + "\"", getSqlCompOpString(condition), joiner != null ? createConstants(type, condition.getConstantDescs()) : createConstant(type, condition.getConstantDesc())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java new file mode 100644 index 0000000..f348c0f --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.hive.util; + +import com.google.common.base.Splitter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; + +import java.util.*; + + +/** + * Util class for mapping between Hive and Phoenix column names + */ +public class ColumnMappingUtils { + + private static final Log LOG = LogFactory.getLog(ColumnMappingUtils.class); + + public static Map<String, String> getColumnMappingMap(String columnMappings) { + if (LOG.isDebugEnabled()) { + LOG.debug("Column mappings : " + columnMappings); + } + + if (columnMappings == null || columnMappings.length() == 0) { + if (LOG.isInfoEnabled()) { + LOG.info("phoenix.column.mapping not set. using field definition"); + } + + return Collections.emptyMap(); + } + + Map<String, String> columnMappingMap = Splitter.on(PhoenixStorageHandlerConstants.COMMA) + .trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split + (columnMappings); + + if (LOG.isDebugEnabled()) { + LOG.debug("Column mapping map : " + columnMappingMap); + } + + return columnMappingMap; + } + + public static Map<String, String> getReverseColumnMapping(String columnMapping) { + Map<String, String> myNewHashMap = new LinkedHashMap<>(); + Map<String, String> forward = getColumnMappingMap(columnMapping); + for(Map.Entry<String, String> entry : forward.entrySet()){ + myNewHashMap.put(entry.getValue(), entry.getKey()); + } + return myNewHashMap; + } + + public static List<String> quoteColumns(List<String> readColumnList) { + List<String> newList = new LinkedList<>(); + for(String column : readColumnList) { + newList.add("\""+ column + "\""); + } + return newList; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java index 51f6c7e..b32419a 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java @@ -20,7 +20,10 @@ package org.apache.phoenix.hive.util; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.JobConf; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -94,4 +97,20 @@ public class PhoenixConnectionUtil { clientPort, zNodeParent) : QueryUtil.getUrl(quorum), props); } + public static Configuration getConfiguration(JobConf jobConf) { + Configuration conf = new Configuration(jobConf); + String quorum = conf.get(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM); + if(quorum!=null) { + conf.set(HConstants.ZOOKEEPER_QUORUM, quorum); + } + int zooKeeperClientPort = conf.getInt(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, 0); + if(zooKeeperClientPort != 0) { + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zooKeeperClientPort); + } + String zNodeParent = conf.get(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT); + if(zNodeParent != null) { + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zNodeParent); + } + return conf; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1ae7177d/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java b/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java index 1dc6e25..e4f872e 100644 --- a/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java +++ b/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java @@ -76,8 +76,8 @@ public class PhoenixQueryBuilderTest { public void testBuildQueryWithCharColumns() throws IOException { final String COLUMN_CHAR = "Column_Char"; final String COLUMN_VARCHAR = "Column_VChar"; - final String expectedQueryPrefix = "select /*+ NO_CACHE */ " + COLUMN_CHAR + "," + COLUMN_VARCHAR + - " from TEST_TABLE where "; + final String expectedQueryPrefix = "select /*+ NO_CACHE */ \"" + COLUMN_CHAR + "\",\"" + COLUMN_VARCHAR + + "\" from TEST_TABLE where "; JobConf jobConf = new JobConf(); List<String> readColumnList = Lists.newArrayList(COLUMN_CHAR, COLUMN_VARCHAR); @@ -86,7 +86,7 @@ public class PhoenixQueryBuilderTest { mockedIndexSearchCondition("GenericUDFOPEqual", "CHAR_VALUE2", null, COLUMN_VARCHAR, "varchar(10)", false) ); - assertEquals(expectedQueryPrefix + "Column_Char = 'CHAR_VALUE' and Column_VChar = 'CHAR_VALUE2'", + assertEquals(expectedQueryPrefix + "\"Column_Char\" = 'CHAR_VALUE' and \"Column_VChar\" = 'CHAR_VALUE2'", BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions)); searchConditions = Lists.newArrayList( @@ -94,7 +94,7 @@ public class PhoenixQueryBuilderTest { new Object[]{"CHAR1", "CHAR2", "CHAR3"}, COLUMN_CHAR, "char(10)", false) ); - assertEquals(expectedQueryPrefix + "Column_Char in ('CHAR1', 'CHAR2', 'CHAR3')", + assertEquals(expectedQueryPrefix + "\"Column_Char\" in ('CHAR1', 'CHAR2', 'CHAR3')", BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions)); searchConditions = Lists.newArrayList( @@ -110,7 +110,7 @@ public class PhoenixQueryBuilderTest { new Object[]{"CHAR1", "CHAR2"}, COLUMN_CHAR, "char(10)", false) ); - assertEquals(expectedQueryPrefix + "Column_Char between 'CHAR1' and 'CHAR2'", + assertEquals(expectedQueryPrefix + "\"Column_Char\" between 'CHAR1' and 'CHAR2'", BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions)); searchConditions = Lists.newArrayList(