Repository: carbondata Updated Branches: refs/heads/master 0be69cd36 -> 85cbad246
[CARBONDATA-1008] Use MetastoreListener to sync schema between spark and hive This closes #1214 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/85cbad24 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/85cbad24 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/85cbad24 Branch: refs/heads/master Commit: 85cbad2469ba66acc566ef4b7f493c5748158087 Parents: 0be69cd Author: cenyuhai <261810...@qq.com> Authored: Thu Aug 3 00:22:00 2017 +0800 Committer: chenliang613 <chenliang...@apache.org> Committed: Thu Aug 10 21:39:57 2017 +0800 ---------------------------------------------------------------------- integration/hive/hive-guide.md | 23 +++--- integration/hive/pom.xml | 14 +++- .../hive/CarbonHiveMetastoreListener.scala | 74 ++++++++++++++++++++ 3 files changed, 96 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/hive-guide.md ---------------------------------------------------------------------- diff --git a/integration/hive/hive-guide.md b/integration/hive/hive-guide.md index 9b72443..b3848b5 100644 --- a/integration/hive/hive-guide.md +++ b/integration/hive/hive-guide.md @@ -41,10 +41,17 @@ mvn -DskipTests -Pspark-2.1 -Phadoop-2.7.2 clean package $HADOOP_HOME/bin/hadoop fs -put sample.csv <hdfs store path>/sample.csv ``` +* Add the following params to $SPARK_CONF_DIR/conf/hive-site.xml +```xml +<property> + <name>hive.metastore.pre.event.listeners</name> + <value>org.apache.carbondata.hive.CarbonHiveMetastoreListener</value> +</property> +``` * Start Spark shell by running the following command in the Spark directory ``` -./bin/spark-shell --jars <carbondata assembly jar path> +./bin/spark-shell --jars <carbondata assembly jar path, carbon hive jar path> ``` ``` @@ -69,6 +76,7 @@ mkdir hive/auxlibs/ cp carbondata/assembly/target/scala-2.11/carbondata_2.11*.jar hive/auxlibs/ cp carbondata/integration/hive/target/carbondata-hive-*.jar hive/auxlibs/ cp $SPARK_HOME/jars/spark-catalyst*.jar hive/auxlibs/ +cp $SPARK_HOME/jars/scala*.jar hive/auxlibs/ export HIVE_AUX_JARS_PATH=hive/auxlibs/ ``` ### Fix snappy issue @@ -80,19 +88,6 @@ export HADOOP_OPTS="-Dorg.xerial.snappy.lib.path=/Library/Java/Extensions -Dorg. ### Start hive client $HIVE_HOME/bin/hive -### Initialize schema in hive -``` -create table in hive: -CREATE TABLE IF NOT EXISTS hive_carbon(id int, name string, scale decimal, country string, salary double) row format delimited fields terminated by ',' stored as textfile; - -alter table hive_carbon set FILEFORMAT -INPUTFORMAT "org.apache.carbondata.hive.MapredCarbonInputFormat" -OUTPUTFORMAT "org.apache.carbondata.hive.MapredCarbonOutputFormat" -SERDE "org.apache.carbondata.hive.CarbonHiveSerDe"; - -alter table hive_carbon set LOCATION '<hdfs store path>/carbon/store/default/hive_carbon'; -``` - ### Query data from hive table ``` http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/pom.xml ---------------------------------------------------------------------- diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml index 12a1520..a5ba448 100644 --- a/integration/hive/pom.xml +++ b/integration/hive/pom.xml @@ -105,12 +105,18 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.binary.version}</artifactId> + <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> @@ -136,6 +142,12 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala new file mode 100644 index 0000000..fd686ae --- /dev/null +++ b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hive + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.metastore.MetaStorePreEventListener +import org.apache.hadoop.hive.metastore.api.{FieldSchema, MetaException} +import org.apache.hadoop.hive.metastore.events._ +import org.apache.hadoop.hive.metastore.events.PreEventContext.PreEventType._ +import org.apache.spark.sql.types.{DataType, StructField, StructType} + +class CarbonHiveMetastoreListener(conf: Configuration) extends MetaStorePreEventListener(conf) { + + override def onEvent(preEventContext: PreEventContext): Unit = { + preEventContext.getEventType match { + case CREATE_TABLE => + val table = preEventContext.asInstanceOf[PreCreateTableEvent].getTable + val tableProps = table.getParameters + if (tableProps != null && + tableProps.get("spark.sql.sources.provider") == "org.apache.spark.sql.CarbonSource") { + val numSchemaParts = tableProps.get("spark.sql.sources.schema.numParts") + if (numSchemaParts != null && !numSchemaParts.isEmpty) { + val parts = (0 until numSchemaParts.toInt).map { index => + val part = tableProps.get(s"spark.sql.sources.schema.part.${index}") + if (part == null) { + throw new MetaException(s"spark.sql.sources.schema.part.${index} is missing!") + } + part + } + // Stick all parts back to a single schema string. + val schema = DataType.fromJson(parts.mkString).asInstanceOf[StructType] + val hiveSchema = schema.map(toHiveColumn).asJava + table.getSd.setCols(hiveSchema) + table.getSd.setInputFormat("org.apache.carbondata.hive.MapredCarbonInputFormat") + table.getSd.setOutputFormat("org.apache.carbondata.hive.MapredCarbonOutputFormat") + val serdeInfo = table.getSd.getSerdeInfo + serdeInfo.setSerializationLib("org.apache.carbondata.hive.CarbonHiveSerDe") + val tablePath = serdeInfo.getParameters.get("tablePath") + if (tablePath != null) { + table.getSd.setLocation(tablePath) + } + } + } + case _ => + // do nothing + } + } + + private def toHiveColumn(c: StructField): FieldSchema = { + val typeString = if (c.metadata.contains("HIVE_TYPE_STRING")) { + c.metadata.getString("HIVE_TYPE_STRING") + } else { + c.dataType.catalogString + } + new FieldSchema(c.name, typeString, c.getComment().orNull) + } +}