Author: khorgath Date: Tue Aug 19 22:41:10 2014 New Revision: 1619005 URL: http://svn.apache.org/r1619005 Log: HIVE-7068 : Integrate AccumuloStorageHandler (Josh Elser, reviewed by Navis, Nick Dimiduk & Sushanth Sowmyan)
Added: hive/trunk/accumulo-handler/ hive/trunk/accumulo-handler/pom.xml hive/trunk/accumulo-handler/src/ hive/trunk/accumulo-handler/src/java/ hive/trunk/accumulo-handler/src/java/org/ hive/trunk/accumulo-handler/src/java/org/apache/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/TooManyAccumuloColumnsException.java hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/TooManyHiveColumnsException.java hive/trunk/accumulo-handler/src/test/ hive/trunk/accumulo-handler/src/test/org/ hive/trunk/accumulo-handler/src/test/org/apache/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloHiveRow.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestLazyAccumuloMap.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestLazyAccumuloRow.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnEncoding.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnMapper.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnMappingFactory.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestHiveAccumuloColumnMapping.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestHiveRowIdColumnMapping.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTypes.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestPrimitiveComparisonFilter.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestDoubleCompare.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestIntCompare.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestLongComparison.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestStringCompare.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/ hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/DelimitedAccumuloRowIdFactory.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/FirstCharAccumuloCompositeRowId.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloRowSerializer.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloSerDe.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloSerDeParameters.java hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestDefaultAccumuloRowIdFactory.java hive/trunk/accumulo-handler/src/test/queries/ hive/trunk/accumulo-handler/src/test/queries/positive/ hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_custom_key.q hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_custom_key2.q hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_joins.q hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_predicate_pushdown.q hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_queries.q hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_single_sourced_multi_insert.q hive/trunk/accumulo-handler/src/test/results/ hive/trunk/accumulo-handler/src/test/results/positive/ hive/trunk/accumulo-handler/src/test/results/positive/accumulo_custom_key.q.out hive/trunk/accumulo-handler/src/test/results/positive/accumulo_custom_key2.q.out hive/trunk/accumulo-handler/src/test/results/positive/accumulo_joins.q.out hive/trunk/accumulo-handler/src/test/results/positive/accumulo_predicate_pushdown.q.out hive/trunk/accumulo-handler/src/test/results/positive/accumulo_queries.q.out hive/trunk/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out hive/trunk/accumulo-handler/src/test/templates/ hive/trunk/accumulo-handler/src/test/templates/TestAccumuloCliDriver.vm hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java Modified: hive/trunk/itests/qtest/pom.xml hive/trunk/itests/util/pom.xml hive/trunk/packaging/pom.xml hive/trunk/pom.xml Added: hive/trunk/accumulo-handler/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/pom.xml?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/pom.xml (added) +++ hive/trunk/accumulo-handler/pom.xml Tue Aug 19 22:41:10 2014 @@ -0,0 +1,158 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive</artifactId> + <version>0.14.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hive-accumulo-handler</artifactId> + <packaging>jar</packaging> + <name>Hive Accumulo Handler</name> + + <properties> + <hive.path.to.root>..</hive.path.to.root> + </properties> + + <dependencies> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-fate</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-start</artifactId> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-trace</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>hadoop-1</id> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>${hadoop-20S.version}</version> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop-2</id> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop-23.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop-23.version}</version> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class AccumuloConnectionParameters { + public static final String USER_NAME = "accumulo.user.name"; + public static final String USER_PASS = "accumulo.user.pass"; + public static final String ZOOKEEPERS = "accumulo.zookeepers"; + public static final String INSTANCE_NAME = "accumulo.instance.name"; + public static final String TABLE_NAME = "accumulo.table.name"; + + public static final String USE_MOCK_INSTANCE = "accumulo.mock.instance"; + + protected Configuration conf; + protected boolean useMockInstance = false; + + public AccumuloConnectionParameters(Configuration conf) { + // TableDesc#getDeserializer will ultimately instantiate the AccumuloSerDe with a null + // Configuration + // We have to accept this and just fail late if data is attempted to be pulled from the + // Configuration + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + public String getAccumuloUserName() { + Preconditions.checkNotNull(conf); + return conf.get(USER_NAME); + } + + public String getAccumuloPassword() { + Preconditions.checkNotNull(conf); + return conf.get(USER_PASS); + } + + public String getAccumuloInstanceName() { + Preconditions.checkNotNull(conf); + return conf.get(INSTANCE_NAME); + } + + public String getZooKeepers() { + Preconditions.checkNotNull(conf); + return conf.get(ZOOKEEPERS); + } + + public String getAccumuloTableName() { + Preconditions.checkNotNull(conf); + return conf.get(TABLE_NAME); + } + + public boolean useMockInstance() { + Preconditions.checkNotNull(conf); + return conf.getBoolean(USE_MOCK_INSTANCE, false); + } + + public Instance getInstance() { + String instanceName = getAccumuloInstanceName(); + + // Fail with a good message + if (null == instanceName) { + throw new IllegalArgumentException("Accumulo instance name must be provided in hiveconf using " + INSTANCE_NAME); + } + + if (useMockInstance()) { + return new MockInstance(instanceName); + } + + String zookeepers = getZooKeepers(); + + // Fail with a good message + if (null == zookeepers) { + throw new IllegalArgumentException("ZooKeeper quorum string must be provided in hiveconf using " + ZOOKEEPERS); + } + + return new ZooKeeperInstance(instanceName, zookeepers); + } + + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { + Instance inst = getInstance(); + return getConnector(inst); + } + + public Connector getConnector(Instance inst) throws AccumuloException, AccumuloSecurityException { + String username = getAccumuloUserName(), password = getAccumuloPassword(); + + // Fail with a good message + if (null == username) { + throw new IllegalArgumentException("Accumulo user name must be provided in hiveconf using " + USER_NAME); + } + if (null == password) { + throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS); + } + + return inst.getConnector(username, new PasswordToken(password)); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import java.nio.charset.Charset; + +/** + * + */ +public class AccumuloHiveConstants { + public static final String ROWID = ":rowID"; + public static final char COLON = ':', COMMA = ',', ESCAPE = '\\', POUND = '#', ASTERISK = '*'; + + public static final String ESCAPED_COLON = Character.toString(ESCAPE) + Character.toString(COLON); + + // Escape the escape + public static final String ESCAPED_COLON_REGEX = Character.toString(ESCAPE) + + Character.toString(ESCAPE) + Character.toString(COLON); + + public static final String ESCAPED_ASTERISK = Character.toString(ESCAPE) + + Character.toString(ASTERISK); + + // Escape the escape, and escape the asterisk + public static final String ESCAPED_ASERTISK_REGEX = Character.toString(ESCAPE) + + Character.toString(ESCAPE) + Character.toString(ESCAPE) + Character.toString(ASTERISK); + + public static final Charset UTF_8 = Charset.forName("UTF-8"); +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,230 @@ +package org.apache.hadoop.hive.accumulo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Preconditions; + +/** + * Holds column tuples for rowID. Each tuple contains column family label, qualifier label, and byte + * array value. + */ +public class AccumuloHiveRow implements Writable { + + private String rowId; + private List<ColumnTuple> tuples = new ArrayList<ColumnTuple>(); + + public AccumuloHiveRow() {} + + public AccumuloHiveRow(String rowId) { + this.rowId = rowId; + } + + public void setRowId(String rowId) { + this.rowId = rowId; + } + + public List<ColumnTuple> getTuples() { + return Collections.unmodifiableList(tuples); + } + + /** + * @return true if this instance has a tuple containing fam and qual, false otherwise. + */ + public boolean hasFamAndQual(Text fam, Text qual) { + for (ColumnTuple tuple : tuples) { + if (tuple.getCf().equals(fam) && tuple.getCq().equals(qual)) { + return true; + } + } + return false; + } + + /** + * @return byte [] value for first tuple containing fam and qual or null if no match. + */ + public byte[] getValue(Text fam, Text qual) { + for (ColumnTuple tuple : tuples) { + if (tuple.getCf().equals(fam) && tuple.getCq().equals(qual)) { + return tuple.getValue(); + } + } + return null; + } + + public String getRowId() { + return rowId; + } + + public void clear() { + this.rowId = null; + this.tuples = new ArrayList<ColumnTuple>(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AccumuloHiveRow{"); + builder.append("rowId='").append(rowId).append("', tuples: "); + for (ColumnTuple tuple : tuples) { + builder.append(tuple.toString()); + builder.append("\n"); + } + return builder.toString(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof AccumuloHiveRow) { + AccumuloHiveRow other = (AccumuloHiveRow) o; + if (null == rowId) { + if (null != other.rowId) { + return false; + } + } else if (!rowId.equals(other.rowId)) { + return false; + } + + return tuples.equals(other.tuples); + } + + return false; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + if (null != rowId) { + dataOutput.writeBoolean(true); + dataOutput.writeUTF(rowId); + } else { + dataOutput.writeBoolean(false); + } + int size = tuples.size(); + dataOutput.writeInt(size); + for (ColumnTuple tuple : tuples) { + Text cf = tuple.getCf(), cq = tuple.getCq(); + dataOutput.writeInt(cf.getLength()); + dataOutput.write(cf.getBytes(), 0, cf.getLength()); + dataOutput.writeInt(cq.getLength()); + dataOutput.write(cq.getBytes(), 0, cq.getLength()); + byte[] value = tuple.getValue(); + dataOutput.writeInt(value.length); + dataOutput.write(value); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + if (dataInput.readBoolean()) { + rowId = dataInput.readUTF(); + } + int size = dataInput.readInt(); + for (int i = 0; i < size; i++) { + int cfLength = dataInput.readInt(); + byte[] cfData = new byte[cfLength]; + dataInput.readFully(cfData, 0, cfLength); + Text cf = new Text(cfData); + int cqLength = dataInput.readInt(); + byte[] cqData = new byte[cqLength]; + dataInput.readFully(cqData, 0, cqLength); + Text cq = new Text(cqData); + int valSize = dataInput.readInt(); + byte[] val = new byte[valSize]; + for (int j = 0; j < valSize; j++) { + val[j] = dataInput.readByte(); + } + tuples.add(new ColumnTuple(cf, cq, val)); + } + } + + public void add(String cf, String qual, byte[] val) { + Preconditions.checkNotNull(cf); + Preconditions.checkNotNull(qual); + Preconditions.checkNotNull(val); + + add(new Text(cf), new Text(qual), val); + } + + public void add(Text cf, Text qual, byte[] val) { + Preconditions.checkNotNull(cf); + Preconditions.checkNotNull(qual); + Preconditions.checkNotNull(val); + + tuples.add(new ColumnTuple(cf, qual, val)); + } + + public static class ColumnTuple { + private final Text cf; + private final Text cq; + private final byte[] value; + + public ColumnTuple(Text cf, Text cq, byte[] value) { + this.value = value; + this.cf = cf; + this.cq = cq; + } + + public byte[] getValue() { + return value; + } + + public Text getCf() { + return cf; + } + + public Text getCq() { + return cq; + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(9683, 68783); + return hcb.append(cf).append(cq).append(value).toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ColumnTuple) { + ColumnTuple other = (ColumnTuple) o; + if (null == cf) { + if (null != other.cf) { + return false; + } + } else if (!cf.equals(other.cf)) { + return false; + } + + if (null == cq) { + if (null != other.cq) { + return false; + } + } else if (!cq.equals(other.cq)) { + return false; + } + + if (null == value) { + if (null != other.value) { + return false; + } + } + + return Arrays.equals(value, other.value); + } + + return false; + } + + @Override + public String toString() { + return cf + " " + cq + " " + new String(value); + } + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,341 @@ +package org.apache.hadoop.hive.accumulo; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.fate.Fate; +import org.apache.accumulo.start.Main; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat; +import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat; +import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary. + */ +public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook, + HiveStoragePredicateHandler { + private static final Logger log = LoggerFactory.getLogger(AccumuloStorageHandler.class); + private static final String DEFAULT_PREFIX = "default"; + + protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); + protected AccumuloConnectionParameters connectionParams; + protected Configuration conf; + + /** + * Push down table properties into the JobConf. + * + * @param desc + * Hive table description + * @param jobProps + * Properties that will be added to the JobConf by Hive + */ + @Override + public void configureTableJobProperties(TableDesc desc, Map<String,String> jobProps) { + // Should not be getting invoked, configureInputJobProperties or configureOutputJobProperties + // should be invoked instead. + configureInputJobProperties(desc, jobProps); + configureOutputJobProperties(desc, jobProps); + } + + protected String getTableName(Table table) throws MetaException { + // Use TBLPROPERTIES + String tableName = table.getParameters().get(AccumuloSerDeParameters.TABLE_NAME); + + if (null != tableName) { + return tableName; + } + + // Then try SERDEPROPERTIES + tableName = table.getSd().getSerdeInfo().getParameters() + .get(AccumuloSerDeParameters.TABLE_NAME); + + if (null != tableName) { + return tableName; + } + + // Use the hive table name, ignoring the default database + if (DEFAULT_PREFIX.equals(table.getDbName())) { + return table.getTableName(); + } else { + return table.getDbName() + "." + table.getTableName(); + } + } + + protected String getTableName(TableDesc tableDesc) { + Properties props = tableDesc.getProperties(); + String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME); + if (null != tableName) { + return tableName; + } + + tableName = props.getProperty(hive_metastoreConstants.META_TABLE_NAME); + + if (tableName.startsWith(DEFAULT_PREFIX + ".")) { + return tableName.substring(DEFAULT_PREFIX.length() + 1); + } + + return tableName; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + connectionParams = new AccumuloConnectionParameters(conf); + } + + @SuppressWarnings("deprecation") + @Override + public Class<? extends SerDe> getSerDeClass() { + return AccumuloSerDe.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + return null; + } + + @Override + public void configureInputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) { + Properties props = tableDesc.getProperties(); + + jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, + props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS)); + + String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME); + if (null == tableName) { + tableName = getTableName(tableDesc); + } + jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, + tableName); + + String useIterators = props.getProperty(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY); + if (useIterators != null) { + if (!useIterators.equalsIgnoreCase("true") && !useIterators.equalsIgnoreCase("false")) { + throw new IllegalArgumentException("Expected value of true or false for " + + AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY); + } + + jobProperties.put(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, useIterators); + } + + String storageType = props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE); + if (null != storageType) { + jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, storageType); + } + + String authValue = props.getProperty(AccumuloSerDeParameters.AUTHORIZATIONS_KEY); + if (null != authValue) { + jobProperties.put(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, authValue); + } + + log.info("Computed input job properties of " + jobProperties); + } + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) { + Properties props = tableDesc.getProperties(); + // Adding these job properties will make them available to the OutputFormat in checkOutputSpecs + jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, + props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS)); + + String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME); + if (null == tableName) { + tableName = getTableName(tableDesc); + } + jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, tableName); + + if (props.containsKey(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)) { + jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, + props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); + } + + if (props.containsKey(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY)) { + jobProperties.put(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY, + props.getProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY)); + } + } + + @SuppressWarnings("rawtypes") + @Override + public Class<? extends InputFormat> getInputFormatClass() { + return HiveAccumuloTableInputFormat.class; + } + + @Override + @SuppressWarnings("rawtypes") + public Class<? extends OutputFormat> getOutputFormatClass() { + return HiveAccumuloTableOutputFormat.class; + } + + @Override + public void preCreateTable(Table table) throws MetaException { + boolean isExternal = isExternalTable(table); + if (table.getSd().getLocation() != null) { + throw new MetaException("Location can't be specified for Accumulo"); + } + + Map<String,String> serdeParams = table.getSd().getSerdeInfo().getParameters(); + String columnMapping = serdeParams.get(AccumuloSerDeParameters.COLUMN_MAPPINGS); + if (columnMapping == null) { + throw new MetaException(AccumuloSerDeParameters.COLUMN_MAPPINGS + + " missing from SERDEPROPERTIES"); + } + + try { + String tblName = getTableName(table); + Connector connector = connectionParams.getConnector(); + TableOperations tableOpts = connector.tableOperations(); + + // Attempt to create the table, taking EXTERNAL into consideration + if (!tableOpts.exists(tblName)) { + if (!isExternal) { + tableOpts.create(tblName); + } else { + throw new MetaException("Accumulo table " + tblName + + " doesn't exist even though declared external"); + } + } else { + if (!isExternal) { + throw new MetaException("Table " + tblName + + " already exists in Accumulo. Use CREATE EXTERNAL TABLE to register with Hive."); + } + } + } catch (AccumuloSecurityException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } catch (TableExistsException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } catch (AccumuloException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + protected boolean isExternalTable(Table table) { + return MetaStoreUtils.isExternalTable(table); + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // Same as commitDropTable where we always delete the data (accumulo table) + commitDropTable(table, true); + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // do nothing + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + String tblName = getTableName(table); + if (!isExternalTable(table)) { + try { + if (deleteData) { + TableOperations tblOpts = connectionParams.getConnector().tableOperations(); + if (tblOpts.exists(tblName)) { + tblOpts.delete(tblName); + } + } + } catch (AccumuloException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } catch (AccumuloSecurityException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } catch (TableNotFoundException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + } + + @Override + public void preDropTable(Table table) throws MetaException { + // do nothing + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // do nothing + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer, + ExprNodeDesc desc) { + if (!(deserializer instanceof AccumuloSerDe)) { + throw new RuntimeException("Expected an AccumuloSerDe but got " + + deserializer.getClass().getName()); + } + + AccumuloSerDe serDe = (AccumuloSerDe) deserializer; + if (serDe.getIteratorPushdown()) { + return predicateHandler.decompose(conf, desc); + } else { + log.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler."); + return null; + } + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + try { + Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class, + ZooKeeper.class, AccumuloStorageHandler.class); + } catch (IOException e) { + log.error("Could not add necessary Accumulo dependencies to classpath", e); + } + + Properties tblProperties = tableDesc.getProperties(); + AccumuloSerDeParameters serDeParams = null; + try { + serDeParams = new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName()); + } catch (SerDeException e) { + log.error("Could not instantiate AccumuloSerDeParameters", e); + return; + } + + try { + serDeParams.getRowIdFactory().addDependencyJars(jobConf); + } catch (IOException e) { + log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e); + } + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.accumulo.AccumuloHiveRow.ColumnTuple; +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Charsets; + +/** + * A Hive Map created from some collection of Key-Values from one to many column families with one + * to many column qualifiers. + */ +public class LazyAccumuloMap extends LazyMap { + + protected AccumuloHiveRow sourceRow; + protected HiveAccumuloMapColumnMapping columnMapping; + + public LazyAccumuloMap(LazyMapObjectInspector oi) { + super(oi); + } + + public void init(AccumuloHiveRow row, HiveAccumuloMapColumnMapping columnMapping) { + this.sourceRow = row; + this.columnMapping = columnMapping; + + this.setParsed(false); + } + + protected void parse() { + if (null == this.cachedMap) { + this.cachedMap = new LinkedHashMap<Object,Object>(); + } else { + this.cachedMap.clear(); + } + + LazyMapObjectInspector lazyMoi = getInspector(); + + Text cf = new Text(columnMapping.getColumnFamily()); + for (ColumnTuple tuple : sourceRow.getTuples()) { + String cq = tuple.getCq().toString(); + + if (!cf.equals(tuple.getCf()) || !cq.startsWith(columnMapping.getColumnQualifierPrefix())) { + // A column family or qualifier we don't want to include in the map + continue; + } + + // Because we append the cq prefix when serializing the column + // we should also remove it when pulling it from Accumulo + cq = cq.substring(columnMapping.getColumnQualifierPrefix().length()); + + // Keys are always primitive, respect the binary + LazyPrimitive<? extends ObjectInspector,? extends Writable> key = LazyFactory + .createLazyPrimitiveClass((PrimitiveObjectInspector) lazyMoi.getMapKeyObjectInspector(), + ColumnEncoding.BINARY == columnMapping.getKeyEncoding()); + + ByteArrayRef keyRef = new ByteArrayRef(); + keyRef.setData(cq.getBytes(Charsets.UTF_8)); + key.init(keyRef, 0, keyRef.getData().length); + + // Value can be anything, use the obj inspector and respect binary + LazyObject<?> value = LazyFactory.createLazyObject(lazyMoi.getMapValueObjectInspector(), + ColumnEncoding.BINARY == columnMapping.getValueEncoding()); + + ByteArrayRef valueRef = new ByteArrayRef(); + valueRef.setData(tuple.getValue()); + value.init(valueRef, 0, valueRef.getData().length); + + cachedMap.put(key, value); + } + + this.setParsed(true); + } + + /** + * Get the value in the map for the given key. + * + * @param key + * The key, a column qualifier, from the map + * @return The object in the map at the given key + */ + @Override + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry<Object,Object> entry : cachedMap.entrySet()) { + LazyPrimitive<?,?> lazyKey = (LazyPrimitive<?,?>) entry.getKey(); + + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKey.getWritableObject(); + if (keyI == null) { + continue; + } + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject<?> v = (LazyObject<?>) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + @Override + public Map<Object,Object> getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + @Override + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,140 @@ +package org.apache.hadoop.hive.accumulo; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; +import org.apache.hadoop.hive.accumulo.columns.ColumnMapping; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; +import org.apache.hadoop.hive.accumulo.serde.AccumuloRowIdFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * + * Parses column tuples in each AccumuloHiveRow and creates Lazy objects for each field. + * + */ +public class LazyAccumuloRow extends LazyStruct { + private static final Logger log = Logger.getLogger(LazyAccumuloRow.class); + + private AccumuloHiveRow row; + private List<ColumnMapping> columnMappings; + private ArrayList<Object> cachedList = new ArrayList<Object>(); + private AccumuloRowIdFactory rowIdFactory; + + public LazyAccumuloRow(LazySimpleStructObjectInspector inspector) { + super(inspector); + } + + public void init(AccumuloHiveRow hiveRow, List<ColumnMapping> columnMappings, + AccumuloRowIdFactory rowIdFactory) { + this.row = hiveRow; + this.columnMappings = columnMappings; + this.rowIdFactory = rowIdFactory; + setParsed(false); + } + + private void parse() { + if (getFields() == null) { + // Will properly set string or binary serialization via createLazyField(...) + initLazyFields(oi.getAllStructFieldRefs()); + } + if (!getParsed()) { + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + } + + @Override + public Object getField(int id) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(id); + } + + /* + * split pairs by delimiter. + */ + private Object uncheckedGetField(int id) { + if (!getFieldInited()[id]) { + ByteArrayRef ref; + ColumnMapping columnMapping = columnMappings.get(id); + + if (columnMapping instanceof HiveAccumuloMapColumnMapping) { + HiveAccumuloMapColumnMapping mapColumnMapping = (HiveAccumuloMapColumnMapping) columnMapping; + + LazyAccumuloMap map = (LazyAccumuloMap) getFields()[id]; + map.init(row, mapColumnMapping); + } else { + if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) { + // Use the rowID directly + ref = new ByteArrayRef(); + ref.setData(row.getRowId().getBytes()); + } else if (columnMapping instanceof HiveAccumuloColumnMapping) { + HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping; + + // Use the colfam and colqual to get the value + byte[] val = row.getValue(new Text(accumuloColumnMapping.getColumnFamily()), new Text( + accumuloColumnMapping.getColumnQualifier())); + if (val == null) { + return null; + } else { + ref = new ByteArrayRef(); + ref.setData(val); + } + } else { + log.error("Could not process ColumnMapping of type " + columnMapping.getClass() + + " at offset " + id + " in column mapping: " + columnMapping.getMappingSpec()); + throw new IllegalArgumentException("Cannot process ColumnMapping of type " + + columnMapping.getClass()); + } + + getFields()[id].init(ref, 0, ref.getData().length); + } + + // HIVE-3179 only init the field when it isn't null + getFieldInited()[id] = true; + } + + return getFields()[id].getObject(); + } + + @Override + public ArrayList<Object> getFieldsAsList() { + if (!getParsed()) { + parse(); + } + cachedList.clear(); + for (int i = 0; i < getFields().length; i++) { + cachedList.add(uncheckedGetField(i)); + } + return cachedList; + } + + @Override + protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { + final ColumnMapping columnMapping = columnMappings.get(fieldID); + + if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) { + return rowIdFactory.createRowId(fieldRef.getFieldObjectInspector()); + } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) { + return new LazyAccumuloMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector()); + } else { + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(), + ColumnEncoding.BINARY == columnMapping.getEncoding()); + } + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hive.accumulo; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLDecoder; +import java.text.MessageFormat; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.jar.JarFile; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; +import java.util.zip.ZipOutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * Accumulo doesn't have a TableMapReduceUtil.addDependencyJars method like HBase which is very + * helpful + */ +public class Utils { + private static final Logger log = Logger.getLogger(Utils.class); + + // Thanks, HBase + public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Set<String> jars = new HashSet<String>(); + // Add jars that are already in the tmpjars variable + jars.addAll(conf.getStringCollection("tmpjars")); + + // add jars as we find them to a map of contents jar name so that we can + // avoid + // creating new jars for classes that have already been packaged. + Map<String,String> packagedClasses = new HashMap<String,String>(); + + // Add jars containing the specified classes + for (Class<?> clazz : classes) { + if (clazz == null) + continue; + + Path path = findOrCreateJar(clazz, localFs, packagedClasses); + if (path == null) { + log.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster."); + continue; + } + if (!localFs.exists(path)) { + log.warn("Could not validate jar file " + path + " for class " + clazz); + continue; + } + jars.add(path.toString()); + } + if (jars.isEmpty()) + return; + + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + + /** + * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the Jar for a class or + * creates it if it doesn't exist. If the class is in a directory in the classpath, it creates a + * Jar on the fly with the contents of the directory and returns the path to that Jar. If a Jar is + * created, it is created in the system temporary directory. Otherwise, returns an existing jar + * that contains a class of the same name. Maintains a mapping from jar contents to the tmp jar + * created. + * + * @param my_class + * the class to find. + * @param fs + * the FileSystem with which to qualify the returned path. + * @param packagedClasses + * a map of class name to path. + * @return a jar file that contains the class. + * @throws IOException + */ + @SuppressWarnings("deprecation") + private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, + Map<String,String> packagedClasses) throws IOException { + // attempt to locate an existing jar for the class. + String jar = findContainingJar(my_class, packagedClasses); + if (null == jar || jar.isEmpty()) { + jar = getJar(my_class); + updateMap(jar, packagedClasses); + } + + if (null == jar || jar.isEmpty()) { + return null; + } + + log.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); + return new Path(jar).makeQualified(fs); + } + + /** + * Add entries to <code>packagedClasses</code> corresponding to class files contained in + * <code>jar</code>. + * + * @param jar + * The jar who's content to list. + * @param packagedClasses + * map[class -> jar] + */ + private static void updateMap(String jar, Map<String,String> packagedClasses) throws IOException { + if (null == jar || jar.isEmpty()) { + return; + } + ZipFile zip = null; + try { + zip = new ZipFile(jar); + for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { + ZipEntry entry = iter.nextElement(); + if (entry.getName().endsWith("class")) { + packagedClasses.put(entry.getName(), jar); + } + } + } finally { + if (null != zip) + zip.close(); + } + } + + /** + * Find a jar that contains a class of the same name, if any. It will return a jar file, even if + * that is not the first thing on the class path that has a class with the same name. Looks first + * on the classpath and then in the <code>packagedClasses</code> map. + * + * @param my_class + * the class to find. + * @return a jar file that contains the class, or null. + * @throws IOException + */ + private static String findContainingJar(Class<?> my_class, Map<String,String> packagedClasses) + throws IOException { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + + // first search the classpath + for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + + // now look in any jars we've packaged using JarFinder. Returns null + // when + // no jar is found. + return packagedClasses.get(class_file); + } + + /** + * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration contexts + * (HBASE-8140) and also for testing on MRv2. First check if we have HADOOP-9426. Lacking that, + * fall back to the backport. + * + * @param my_class + * the class to find. + * @return a jar file that contains the class, or null. + */ + private static String getJar(Class<?> my_class) { + String ret = null; + String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; + Class<?> jarFinder = null; + try { + log.debug("Looking for " + hadoopJarFinder + "."); + jarFinder = Class.forName(hadoopJarFinder); + log.debug(hadoopJarFinder + " found."); + Method getJar = jarFinder.getMethod("getJar", Class.class); + ret = (String) getJar.invoke(null, my_class); + } catch (ClassNotFoundException e) { + log.debug("Using backported JarFinder."); + ret = jarFinderGetJar(my_class); + } catch (InvocationTargetException e) { + // function was properly called, but threw it's own exception. + // Unwrap it + // and pass it on. + throw new RuntimeException(e.getCause()); + } catch (Exception e) { + // toss all other exceptions, related to reflection failure + throw new RuntimeException("getJar invocation failed.", e); + } + + return ret; + } + + /** + * Returns the full path to the Jar containing the class. It always return a JAR. + * + * @param klass + * class. + * + * @return path to the Jar containing the class. + */ + @SuppressWarnings("rawtypes") + public static String jarFinderGetJar(Class klass) { + Preconditions.checkNotNull(klass, "klass"); + ClassLoader loader = klass.getClassLoader(); + if (loader != null) { + String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + String path = url.getPath(); + if (path.startsWith("file:")) { + path = path.substring("file:".length()); + } + path = URLDecoder.decode(path, "UTF-8"); + if ("jar".equals(url.getProtocol())) { + path = URLDecoder.decode(path, "UTF-8"); + return path.replaceAll("!.*$", ""); + } else if ("file".equals(url.getProtocol())) { + String klassName = klass.getName(); + klassName = klassName.replace(".", "/") + ".class"; + path = path.substring(0, path.length() - klassName.length()); + File baseDir = new File(path); + File testDir = new File(System.getProperty("test.build.dir", "target/test-dir")); + testDir = testDir.getAbsoluteFile(); + if (!testDir.exists()) { + testDir.mkdirs(); + } + File tempJar = File.createTempFile("hadoop-", "", testDir); + tempJar = new File(tempJar.getAbsolutePath() + ".jar"); + createJar(baseDir, tempJar); + return tempJar.getAbsolutePath(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } + + private static void copyToZipStream(InputStream is, ZipEntry entry, ZipOutputStream zos) + throws IOException { + zos.putNextEntry(entry); + byte[] arr = new byte[4096]; + int read = is.read(arr); + while (read > -1) { + zos.write(arr, 0, read); + read = is.read(arr); + } + is.close(); + zos.closeEntry(); + } + + public static void jarDir(File dir, String relativePath, ZipOutputStream zos) throws IOException { + Preconditions.checkNotNull(relativePath, "relativePath"); + Preconditions.checkNotNull(zos, "zos"); + + // by JAR spec, if there is a manifest, it must be the first entry in + // the + // ZIP. + File manifestFile = new File(dir, JarFile.MANIFEST_NAME); + ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME); + if (!manifestFile.exists()) { + zos.putNextEntry(manifestEntry); + new Manifest().write(new BufferedOutputStream(zos)); + zos.closeEntry(); + } else { + InputStream is = new FileInputStream(manifestFile); + copyToZipStream(is, manifestEntry, zos); + } + zos.closeEntry(); + zipDir(dir, relativePath, zos, true); + zos.close(); + } + + private static void zipDir(File dir, String relativePath, ZipOutputStream zos, boolean start) + throws IOException { + String[] dirList = dir.list(); + for (String aDirList : dirList) { + File f = new File(dir, aDirList); + if (!f.isHidden()) { + if (f.isDirectory()) { + if (!start) { + ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/"); + zos.putNextEntry(dirEntry); + zos.closeEntry(); + } + String filePath = f.getPath(); + File file = new File(filePath); + zipDir(file, relativePath + f.getName() + "/", zos, false); + } else { + String path = relativePath + f.getName(); + if (!path.equals(JarFile.MANIFEST_NAME)) { + ZipEntry anEntry = new ZipEntry(path); + InputStream is = new FileInputStream(f); + copyToZipStream(is, anEntry, zos); + } + } + } + } + } + + private static void createJar(File dir, File jarFile) throws IOException { + Preconditions.checkNotNull(dir, "dir"); + Preconditions.checkNotNull(jarFile, "jarFile"); + File jarDir = jarFile.getParentFile(); + if (!jarDir.exists()) { + if (!jarDir.mkdirs()) { + throw new IOException(MessageFormat.format("could not create dir [{0}]", jarDir)); + } + } + JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile)); + jarDir(dir, "", zos); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * Encapsulate the encoding of values within the given column in Accumulo + */ +public enum ColumnEncoding { + STRING("string", "s"), BINARY("binary", "b"); + + private static final HashMap<String,ColumnEncoding> CODE_CACHE = new HashMap<String,ColumnEncoding>(), + NAME_CACHE = new HashMap<String,ColumnEncoding>(); + + static { + CODE_CACHE.put(STRING.getCode(), STRING); + CODE_CACHE.put(BINARY.getCode(), BINARY); + + NAME_CACHE.put(STRING.getName(), STRING); + NAME_CACHE.put(BINARY.getName(), BINARY); + } + + private final String name; + private final String code; + + private ColumnEncoding(String name, String code) { + this.name = name; + this.code = code; + } + + public String getName() { + return this.name; + } + + public String getCode() { + return code; + } + + /** + * Get the ColumnEncoding which has the given code. + * + * @param code + * The one-character 'code' which uniquely identifies the ColumnEncoding + * @return The ColumnEncoding with the code equal to the provided argument + */ + public static ColumnEncoding fromCode(String code) { + if (!CODE_CACHE.containsKey(code)) { + throw new IllegalArgumentException("No ColumnEncoding defined with code " + code); + } + + return CODE_CACHE.get(code); + } + + public static ColumnEncoding fromName(String name) { + if (!NAME_CACHE.containsKey(name)) { + throw new IllegalArgumentException("No ColumnEncoding defined with name " + name); + } + + return NAME_CACHE.get(name); + } + + public static ColumnEncoding get(String nameOrCode) { + ColumnEncoding encoding = CODE_CACHE.get(nameOrCode); + if (null != encoding) { + return encoding; + } + + encoding = NAME_CACHE.get(nameOrCode); + if (null != encoding) { + return encoding; + } + + throw new IllegalArgumentException("No ColumnEncoding defined for " + nameOrCode); + } + + public static ColumnEncoding getFromMapping(String columnMapping) { + Preconditions.checkNotNull(columnMapping); + + String encoding = getColumnEncoding(columnMapping); + + return get(encoding); + } + + /** + * Determines if a custom encoding was specified for the give column. + * + * @param columnMapping + * The mapping from Hive column to an Accumulo column + * @return True if the column mapping string specifies an encoding, false otherwise + */ + public static boolean hasColumnEncoding(String columnMapping) { + Preconditions.checkNotNull(columnMapping); + + int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND); + + // Make sure that the '#' wasn't escaped + if (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1)) { + // The encoding name/codes don't contain pound signs + return false; + } + + return -1 != offset; + } + + public static String getColumnEncoding(String columnMapping) { + int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND); + + // Make sure that the '#' wasn't escaped + if (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1)) { + throw new IllegalArgumentException("Column mapping did not contain a column encoding: " + + columnMapping); + } + + return columnMapping.substring(offset + 1); + } + + public static ColumnEncoding getDefault() { + return STRING; + } + + /** + * Removes the column encoding code and separator from the original column mapping string. Throws + * an IllegalArgumentException if this method is called on a string that doesn't contain a code. + * + * @param columnMapping + * The mapping from Hive column to Accumulo column + * @return The column mapping with the code removed + */ + public static String stripCode(String columnMapping) { + Preconditions.checkNotNull(columnMapping); + + int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND); + if (-1 == offset + || (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1))) { + throw new IllegalArgumentException( + "Provided column mapping does not define a column encoding"); + } + + return columnMapping.substring(0, offset); + } + + public static boolean isMapEncoding(String columnEncoding) { + return -1 != columnEncoding.indexOf(AccumuloHiveConstants.COLON); + } + + public static Entry<ColumnEncoding,ColumnEncoding> getMapEncoding(String columnEncoding) { + int index = columnEncoding.indexOf(AccumuloHiveConstants.COLON); + if (-1 == index) { + throw new IllegalArgumentException( + "Serialized column encoding did not contain a pair of encodings to split"); + } + + String encoding1 = columnEncoding.substring(0, index), encoding2 = columnEncoding + .substring(index + 1); + + return Maps.immutableEntry(get(encoding1), get(encoding2)); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; +import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class ColumnMapper { + private static final Logger log = Logger.getLogger(ColumnMapper.class); + + private List<ColumnMapping> columnMappings; + private int rowIdOffset; + private HiveAccumuloRowIdColumnMapping rowIdMapping = null; + private final ColumnEncoding defaultEncoding; + + /** + * Create a mapping from Hive columns (rowID and column) to Accumulo columns (column family and + * qualifier). The ordering of the {@link ColumnMapping}s is important as it aligns with the + * ordering of the columns for the Hive table schema. + * + * @param serializedColumnMappings + * Comma-separated list of designators that map to Accumulo columns whose offsets + * correspond to the Hive table schema + * @throws TooManyAccumuloColumnsException + */ + public ColumnMapper(String serializedColumnMappings, String defaultStorageType, + List<String> columnNames, List<TypeInfo> columnTypes) throws TooManyAccumuloColumnsException { + Preconditions.checkNotNull(serializedColumnMappings); + + String[] parsedColumnMappingValue = StringUtils.split(serializedColumnMappings, + AccumuloHiveConstants.COMMA); + columnMappings = new ArrayList<ColumnMapping>(parsedColumnMappingValue.length); + rowIdOffset = -1; + + // Determine the default encoding type (specified on the table, or the global default + // if none was provided) + if (null == defaultStorageType || "".equals(defaultStorageType)) { + defaultEncoding = ColumnEncoding.getDefault(); + } else { + defaultEncoding = ColumnEncoding.get(defaultStorageType.toLowerCase()); + } + + if (parsedColumnMappingValue.length > columnNames.size()) { + throw new TooManyAccumuloColumnsException("Found " + parsedColumnMappingValue.length + + " columns, but only know of " + columnNames.size() + " Hive column names"); + } + + if (parsedColumnMappingValue.length > columnTypes.size()) { + throw new TooManyAccumuloColumnsException("Found " + parsedColumnMappingValue.length + + " columns, but only know of " + columnNames.size() + " Hive column types"); + } + + for (int i = 0; i < parsedColumnMappingValue.length; i++) { + String columnMappingStr = parsedColumnMappingValue[i]; + + // Create the mapping for this column, with configured encoding + ColumnMapping columnMapping = ColumnMappingFactory.get(columnMappingStr, defaultEncoding, + columnNames.get(i), columnTypes.get(i)); + + if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) { + if (-1 != rowIdOffset) { + throw new IllegalArgumentException( + "Column mapping should only have one definition with a value of " + + AccumuloHiveConstants.ROWID); + } + + rowIdOffset = i; + rowIdMapping = (HiveAccumuloRowIdColumnMapping) columnMapping; + } + + columnMappings.add(columnMapping); + } + } + + public int size() { + return columnMappings.size(); + } + + public ColumnMapping get(int i) { + return columnMappings.get(i); + } + + public List<ColumnMapping> getColumnMappings() { + return Collections.unmodifiableList(columnMappings); + } + + public boolean hasRowIdMapping() { + return null != rowIdMapping; + } + + public HiveAccumuloRowIdColumnMapping getRowIdMapping() { + return rowIdMapping; + } + + public int getRowIdOffset() { + return rowIdOffset; + } + + public String getTypesString() { + StringBuilder sb = new StringBuilder(); + for (ColumnMapping columnMapping : columnMappings) { + if (sb.length() > 0) { + sb.append(AccumuloHiveConstants.COLON); + } + + if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) { + // the rowID column is a string + sb.append(serdeConstants.STRING_TYPE_NAME); + } else if (columnMapping instanceof HiveAccumuloColumnMapping) { + // a normal column is also a string + sb.append(serdeConstants.STRING_TYPE_NAME); + } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) { + // TODO can we be more precise than string,string? + sb.append(serdeConstants.MAP_TYPE_NAME).append("<").append(serdeConstants.STRING_TYPE_NAME) + .append(",").append(serdeConstants.STRING_TYPE_NAME).append(">"); + } else { + throw new IllegalArgumentException("Cannot process ColumnMapping of type " + + columnMapping.getClass().getName()); + } + } + + return sb.toString(); + } + + public ColumnMapping getColumnMappingForHiveColumn(List<String> hiveColumns, String hiveColumnName) { + Preconditions.checkNotNull(hiveColumns); + Preconditions.checkNotNull(hiveColumnName); + Preconditions.checkArgument(columnMappings.size() <= hiveColumns.size(), + "Expected equal number of column mappings and Hive columns, " + columnMappings + ", " + + hiveColumns); + + int hiveColumnOffset = 0; + for (; hiveColumnOffset < hiveColumns.size() && hiveColumnOffset < columnMappings.size(); hiveColumnOffset++) { + if (hiveColumns.get(hiveColumnOffset).equals(hiveColumnName)) { + return columnMappings.get(hiveColumnOffset); + } + } + + log.error("Could not find offset for Hive column with name '" + hiveColumnName + + "' with columns " + hiveColumns); + throw new IllegalArgumentException("Could not find offset for Hive column with name " + + hiveColumnName); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + sb.append("[").append(this.getClass().getSimpleName()).append(" "); + sb.append(columnMappings).append(", rowIdOffset: ").append(this.rowIdOffset) + .append(", defaultEncoding: "); + sb.append(this.defaultEncoding).append("]"); + return sb.toString(); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import com.google.common.base.Preconditions; + +/** + * + */ +public abstract class ColumnMapping { + + // SerDe property for how the Hive column maps to Accumulo + protected final String mappingSpec; + + // The manner in which the values in this column are de/serialized from/to Accumulo + protected final ColumnEncoding encoding; + + // The name of the Hive column + protected final String columnName; + + // The type of the Hive column + // Cannot store the actual TypeInfo because that would require + // Hive jars on the Accumulo classpath which we don't want + protected final String columnType; + + protected ColumnMapping(String mappingSpec, ColumnEncoding encoding, String columnName, + String columnType) { + Preconditions.checkNotNull(mappingSpec); + Preconditions.checkNotNull(encoding); + Preconditions.checkNotNull(columnName); + Preconditions.checkNotNull(columnType); + + this.mappingSpec = mappingSpec; + this.encoding = encoding; + this.columnName = columnName; + this.columnType = columnType; + } + + protected ColumnMapping(String mappingSpec, ColumnEncoding encoding, String columnName, + TypeInfo columnType) { + Preconditions.checkNotNull(mappingSpec); + Preconditions.checkNotNull(encoding); + Preconditions.checkNotNull(columnName); + Preconditions.checkNotNull(columnType); + + this.mappingSpec = mappingSpec; + this.encoding = encoding; + this.columnName = columnName; + this.columnType = columnType.getTypeName(); + } + + /** + * The property defining how this Column is mapped into Accumulo + */ + public String getMappingSpec() { + return mappingSpec; + } + + /** + * The manner in which the value is encoded in Accumulo + */ + public ColumnEncoding getEncoding() { + return encoding; + } + + /** + * The name of the Hive column this is mapping + */ + public String getColumnName() { + return columnName; + } + + /** + * The @{link TypeInfo} of the Hive column this is mapping + */ + public String getColumnType() { + return columnType; + } +}