Repository: carbondata
Updated Branches:
  refs/heads/master 0be69cd36 -> 85cbad246


[CARBONDATA-1008] Use MetastoreListener to sync schema between spark and hive

This closes #1214


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/85cbad24
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/85cbad24
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/85cbad24

Branch: refs/heads/master
Commit: 85cbad2469ba66acc566ef4b7f493c5748158087
Parents: 0be69cd
Author: cenyuhai <261810...@qq.com>
Authored: Thu Aug 3 00:22:00 2017 +0800
Committer: chenliang613 <chenliang...@apache.org>
Committed: Thu Aug 10 21:39:57 2017 +0800

----------------------------------------------------------------------
 integration/hive/hive-guide.md                  | 23 +++---
 integration/hive/pom.xml                        | 14 +++-
 .../hive/CarbonHiveMetastoreListener.scala      | 74 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/hive-guide.md
----------------------------------------------------------------------
diff --git a/integration/hive/hive-guide.md b/integration/hive/hive-guide.md
index 9b72443..b3848b5 100644
--- a/integration/hive/hive-guide.md
+++ b/integration/hive/hive-guide.md
@@ -41,10 +41,17 @@ mvn -DskipTests -Pspark-2.1 -Phadoop-2.7.2 clean package
 $HADOOP_HOME/bin/hadoop fs -put sample.csv <hdfs store path>/sample.csv
 ```
 
+* Add the following params to $SPARK_CONF_DIR/conf/hive-site.xml
+```xml
+<property>
+  <name>hive.metastore.pre.event.listeners</name>
+  <value>org.apache.carbondata.hive.CarbonHiveMetastoreListener</value>
+</property>
+```
 * Start Spark shell by running the following command in the Spark directory
 
 ```
-./bin/spark-shell --jars <carbondata assembly jar path>
+./bin/spark-shell --jars <carbondata assembly jar path, carbon hive jar path>
 ```
 
 ```
@@ -69,6 +76,7 @@ mkdir hive/auxlibs/
 cp carbondata/assembly/target/scala-2.11/carbondata_2.11*.jar hive/auxlibs/
 cp carbondata/integration/hive/target/carbondata-hive-*.jar hive/auxlibs/
 cp $SPARK_HOME/jars/spark-catalyst*.jar hive/auxlibs/
+cp $SPARK_HOME/jars/scala*.jar hive/auxlibs/
 export HIVE_AUX_JARS_PATH=hive/auxlibs/
 ```
 ### Fix snappy issue
@@ -80,19 +88,6 @@ export 
HADOOP_OPTS="-Dorg.xerial.snappy.lib.path=/Library/Java/Extensions -Dorg.
 ### Start hive client
 $HIVE_HOME/bin/hive
 
-### Initialize schema in hive
-```
-create table in hive:
-CREATE TABLE IF NOT EXISTS hive_carbon(id int, name string, scale decimal, 
country string, salary double) row format delimited fields terminated by ',' 
stored as textfile;
-
-alter table hive_carbon set FILEFORMAT
-INPUTFORMAT "org.apache.carbondata.hive.MapredCarbonInputFormat"
-OUTPUTFORMAT "org.apache.carbondata.hive.MapredCarbonOutputFormat"
-SERDE "org.apache.carbondata.hive.CarbonHiveSerDe";
-
-alter table hive_carbon set LOCATION '<hdfs store 
path>/carbon/store/default/hive_carbon';
-```
-
 ### Query data from hive table
 
 ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/pom.xml
----------------------------------------------------------------------
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 12a1520..a5ba448 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -105,12 +105,18 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+          <version>${spark.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
@@ -136,6 +142,12 @@
             <groupId>org.apache.carbondata</groupId>
             <artifactId>carbondata-hadoop</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-sql_2.10</artifactId>
+              </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cbad24/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala
 
b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala
new file mode 100644
index 0000000..fd686ae
--- /dev/null
+++ 
b/integration/hive/src/main/scala/org/apache/carbondata/hive/CarbonHiveMetastoreListener.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hive
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.metastore.MetaStorePreEventListener
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, MetaException}
+import org.apache.hadoop.hive.metastore.events._
+import org.apache.hadoop.hive.metastore.events.PreEventContext.PreEventType._
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+
+class CarbonHiveMetastoreListener(conf: Configuration) extends 
MetaStorePreEventListener(conf) {
+
+  override def onEvent(preEventContext: PreEventContext): Unit = {
+    preEventContext.getEventType match {
+      case CREATE_TABLE =>
+        val table = preEventContext.asInstanceOf[PreCreateTableEvent].getTable
+        val tableProps = table.getParameters
+        if (tableProps != null &&
+            tableProps.get("spark.sql.sources.provider") == 
"org.apache.spark.sql.CarbonSource") {
+          val numSchemaParts = 
tableProps.get("spark.sql.sources.schema.numParts")
+          if (numSchemaParts != null && !numSchemaParts.isEmpty) {
+            val parts = (0 until numSchemaParts.toInt).map { index =>
+              val part = 
tableProps.get(s"spark.sql.sources.schema.part.${index}")
+              if (part == null) {
+                throw new 
MetaException(s"spark.sql.sources.schema.part.${index} is missing!")
+              }
+              part
+            }
+            // Stick all parts back to a single schema string.
+            val schema = 
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
+            val hiveSchema = schema.map(toHiveColumn).asJava
+            table.getSd.setCols(hiveSchema)
+            
table.getSd.setInputFormat("org.apache.carbondata.hive.MapredCarbonInputFormat")
+            
table.getSd.setOutputFormat("org.apache.carbondata.hive.MapredCarbonOutputFormat")
+            val serdeInfo = table.getSd.getSerdeInfo
+            
serdeInfo.setSerializationLib("org.apache.carbondata.hive.CarbonHiveSerDe")
+            val tablePath = serdeInfo.getParameters.get("tablePath")
+            if (tablePath != null) {
+              table.getSd.setLocation(tablePath)
+            }
+          }
+        }
+      case _ =>
+        // do nothing
+    }
+  }
+
+  private def toHiveColumn(c: StructField): FieldSchema = {
+    val typeString = if (c.metadata.contains("HIVE_TYPE_STRING")) {
+      c.metadata.getString("HIVE_TYPE_STRING")
+    } else {
+      c.dataType.catalogString
+    }
+    new FieldSchema(c.name, typeString, c.getComment().orNull)
+  }
+}

Reply via email to