Repository: incubator-carbondata Updated Branches: refs/heads/master 70c1015e4 -> 14595bf69
support append in dataframe fix Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4808378d Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4808378d Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4808378d Branch: refs/heads/master Commit: 4808378df44d7e04aed9b51258c966d8d26d9278 Parents: 70c1015 Author: jackylk <jacky.li...@huawei.com> Authored: Sat Oct 15 05:21:09 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Sat Oct 15 05:29:29 2016 +0800 ---------------------------------------------------------------------- .../examples/DataFrameAPIExample.scala | 9 +- .../carbondata/examples/util/ExampleUtils.scala | 33 ++-- .../spark/CarbonDataFrameWriter.scala | 165 +++++++++++++++++++ .../apache/carbondata/spark/CarbonOption.scala | 4 +- .../spark/implicit/DataFrameFuncs.scala | 159 ------------------ .../org/apache/carbondata/spark/package.scala | 27 --- .../spark/sql/CarbonDatasourceRelation.scala | 6 +- .../dataload/SparkDatasourceSuite.scala | 43 ++++- .../AllDataTypesTestCaseAggregate.scala | 12 +- 9 files changed, 245 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala index 2d9193f..49fb0da 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala @@ -33,8 +33,13 @@ object DataFrameAPIExample { .load() import cc.implicits._ - val count = in.where($"c3" > 500).select($"*").count() - println(s"count using dataframe: $count") + var count = in.where($"c3" > 500).select($"*").count() + println(s"count after 1 load: $count") + + // append new data, query answer should be 1000 + ExampleUtils.appendSampleCarbonFile(cc, "carbon1") + count = in.where($"c3" > 500).select($"*").count() + println(s"count after 2 load: $count") // use SQL to read cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala index 0126c66..cfcdde8 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala @@ -55,24 +55,37 @@ object ExampleUtils { * This func will write a sample CarbonData file containing following schema: * c1: String, c2: String, c3: Double */ - def writeSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = { + def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): Unit = { + cc.sql(s"DROP TABLE IF EXISTS $tableName") + writeDataframe(cc, tableName, numRows, SaveMode.Overwrite) + } + + /** + * This func will append data to the CarbonData file + */ + def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): Unit = { + writeDataframe(cc, tableName, numRows, SaveMode.Append) + } + + /** + * create a new dataframe and write to CarbonData file, based on save mode + */ + private def writeDataframe( + cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): Unit = { // use CarbonContext to write CarbonData files import cc.implicits._ val sc = cc.sparkContext - // create a dataframe, it can be from parquet or hive table - val df = sc.parallelize(1 to 1000, 2) + val df = sc.parallelize(1 to numRows, 2) .map(x => ("a", "b", x)) .toDF("c1", "c2", "c3") - cc.sql(s"DROP TABLE IF EXISTS $tableName") - // save dataframe to carbon file df.write - .format("carbondata") - .option("tableName", tableName) - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() + .format("carbondata") + .option("tableName", tableName) + .option("compress", "true") + .mode(mode) + .save() } } // scalastyle:on println http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala new file mode 100644 index 0000000..65ff787 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark + +import org.apache.hadoop.fs.Path +import org.apache.spark.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.LoadTable +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType} + +class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging { + + def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = { + checkContext() + val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) + + // create a new table using dataframe's schema and write its content into the table + cc.sql(makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))) + writeToCarbonFile(parameters) + } + + def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = { + // append the data as a new load + checkContext() + writeToCarbonFile(parameters) + } + + private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = { + val options = new CarbonOption(parameters) + val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) + if (options.tempCSV) { + loadTempCSV(options, cc) + } else { + loadDataFrame(options, cc) + } + } + + /** + * Firstly, saving DataFrame to CSV files + * Secondly, load CSV files + * @param options + * @param cc + */ + private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = { + // temporary solution: write to csv file, then load the csv into carbon + val tempCSVFolder = "./tempCSV" + writeToTempCSVFile(tempCSVFolder, options) + + val tempCSVPath = new Path(tempCSVFolder) + val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration) + + def countSize(): Double = { + var size: Double = 0 + val itor = fs.listFiles(tempCSVPath, true) + while (itor.hasNext) { + val f = itor.next() + if (f.getPath.getName.startsWith("part-")) { + size += f.getLen + } + } + size + } + + logInfo(s"temporary CSV file size: ${countSize / 1024 / 1024} MB") + + try { + cc.sql(makeLoadString(tempCSVFolder, options)) + } finally { + fs.delete(tempCSVPath, true) + } + } + + private def checkContext(): Unit = { + // To avoid derby problem, dataframe need to be writen and read using CarbonContext + require(dataFrame.sqlContext.isInstanceOf[CarbonContext], + "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe" + ) + } + + private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = { + var writer: DataFrameWriter = + dataFrame.write + .format(csvPackage) + .option("header", "false") + .mode(SaveMode.Overwrite) + + if (options.compress) { + writer = writer.option("codec", "gzip") + } + + writer.save(tempCSVFolder) + } + + /** + * Loading DataFrame directly without saving DataFrame to CSV files. + * @param options + * @param cc + */ + private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = { + val header = dataFrame.columns.mkString(",") + LoadTable( + Some(options.dbName), + options.tableName, + null, + Seq(), + Map(("fileheader" -> header)), + false, + null, + Some(dataFrame)).run(cc) + } + + private def csvPackage: String = "com.databricks.spark.csv.newapi" + + private def convertToCarbonType(sparkType: DataType): String = { + sparkType match { + case StringType => CarbonType.STRING.name + case IntegerType => CarbonType.INT.name + case ByteType => CarbonType.INT.name + case ShortType => CarbonType.SHORT.name + case LongType => CarbonType.LONG.name + case FloatType => CarbonType.DOUBLE.name + case DoubleType => CarbonType.DOUBLE.name + case BooleanType => CarbonType.DOUBLE.name + case TimestampType => CarbonType.TIMESTAMP.name + case other => sys.error(s"unsupported type: $other") + } + } + + private def makeCreateTableString(schema: StructType, options: CarbonOption): String = { + val carbonSchema = schema.map { field => + s"${ field.name } ${ convertToCarbonType(field.dataType) }" + } + s""" + CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} + (${ carbonSchema.mkString(", ") }) + STORED BY '${ CarbonContext.datasourceName }' + """ + } + + private def makeLoadString(csvFolder: String, options: CarbonOption): String = { + s""" + LOAD DATA INPATH '$csvFolder' + INTO TABLE ${options.dbName}.${options.tableName} + OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}') + """ + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index b93841f..71b18b9 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -38,8 +38,8 @@ class CarbonOption(options: Map[String, String]) { "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl") } - def tempCSV: String = options.getOrElse("tempCSV", "true") + def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean - def compress: String = options.getOrElse("compress", "false") + def compress: Boolean = options.getOrElse("compress", "false").toBoolean } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala deleted file mode 100644 index 8585e07..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark - -import org.apache.hadoop.fs.Path -import org.apache.spark.Logging -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.command.LoadTable -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType} - -class DataFrameFuncs(dataFrame: DataFrame) extends Logging { - - /** - * Saves DataFrame as CarbonData files. - */ - def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = { - // To avoid derby problem, dataframe need to be writen and read using CarbonContext - require(dataFrame.sqlContext.isInstanceOf[CarbonContext], - "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe" - ) - - val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) - val options = new CarbonOption(parameters) - cc.sql(makeCreateTableString(dataFrame.schema, options)) - - if (options.tempCSV.equals("true")) { - loadTempCSV(options, cc) - } else { - loadDataFrame(options, cc) - } - } - - /** - * Firstly, saving DataFrame to CSV files - * Secondly, load CSV files - * @param options - * @param cc - */ - private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = { - // temporary solution: write to csv file, then load the csv into carbon - val tempCSVFolder = s"./tempCSV" - var writer: DataFrameWriter = - dataFrame.write - .format(csvPackage) - .option("header", "false") - .mode(SaveMode.Overwrite) - - if (options.compress.equals("true")) { - writer = writer.option("codec", "gzip") - } - - writer.save(tempCSVFolder) - - val tempCSVPath = new Path(tempCSVFolder) - val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration) - - def countSize(): Double = { - var size: Double = 0 - val itor = fs.listFiles(tempCSVPath, true) - while (itor.hasNext) { - val f = itor.next() - if (f.getPath.getName.startsWith("part-")) { - size += f.getLen - } - } - size - } - - try { - logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB") - cc.sql(makeLoadString(options.tableName, tempCSVFolder)) - } finally { - fs.delete(tempCSVPath, true) - } - } - - /** - * Loading DataFrame directly without saving DataFrame to CSV files. - * @param options - * @param cc - */ - private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = { - val header = dataFrame.columns.mkString(",") - LoadTable( - Some(options.dbName), - options.tableName, - null, - Seq(), - Map(("fileheader" -> header)), - false, - null, - Some(dataFrame)).run(cc) - } - - private def csvPackage: String = "com.databricks.spark.csv.newapi" - - private def convertToCarbonType(sparkType: DataType): String = { - sparkType match { - case StringType => CarbonType.STRING.name - case IntegerType => CarbonType.INT.name - case ByteType => CarbonType.INT.name - case ShortType => CarbonType.SHORT.name - case LongType => CarbonType.LONG.name - case FloatType => CarbonType.DOUBLE.name - case DoubleType => CarbonType.DOUBLE.name - case BooleanType => CarbonType.DOUBLE.name - case TimestampType => CarbonType.TIMESTAMP.name - case other => sys.error(s"unsupported type: $other") - } - } - - private def makeCreateTableString(schema: StructType, option: CarbonOption): String = { - val tableName = option.tableName - val dbName = option.dbName - val carbonSchema = schema.map { field => - s"${ field.name } ${ convertToCarbonType(field.dataType) }" - } - s""" - CREATE TABLE IF NOT EXISTS $dbName.$tableName - (${ carbonSchema.mkString(", ") }) - STORED BY '${ CarbonContext.datasourceName }' - """ - } - - private def makeLoadString(tableName: String, csvFolder: String): String = { - s""" - LOAD DATA INPATH '$csvFolder' - INTO TABLE $tableName - OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}') - """ - } - - def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = { - // find out table - // find out streaming segment - // for each rdd partition, find out the appendable carbon file - // check whether it is full - // if full, create new file - // append to it: create blocklet header and data, call thrift to convert, write hdfs - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala deleted file mode 100644 index b1b91e7..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata - -import org.apache.spark.sql.DataFrame - -package object spark { - - implicit def toDataFrameFuncs(dataFrame: DataFrame): DataFrameFuncs = { - new DataFrameFuncs(dataFrame) - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index ef0e497..3441777 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@ -106,10 +106,10 @@ class CarbonSource extends RelationProvider } if (doSave) { - // Only save data when the save mode is Overwrite. - data.saveAsCarbonFile(parameters) + // save data when the save mode is Overwrite. + new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters) } else if (doAppend) { - data.appendToCarbonFile(parameters) + new CarbonDataFrameWriter(data).appendToCarbonFile(parameters) } createRelation(sqlContext, parameters) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala index ebcaf75..1da0533 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala @@ -68,13 +68,46 @@ class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll { assert(in.where("c3 > 500").count() == 500) } - test("saveAsCarbon API") { - import org.apache.carbondata.spark._ - df.saveAsCarbonFile(Map("tableName" -> "carbon2")) - - checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900))) + test("test overwrite") { + sql("DROP TABLE IF EXISTS carbon4") + df.write + .format("carbondata") + .option("tableName", "carbon4") + .mode(SaveMode.Overwrite) + .save() + df.write + .format("carbondata") + .option("tableName", "carbon4") + .mode(SaveMode.Overwrite) + .save() + val in = read + .format("carbondata") + .option("tableName", "carbon4") + .load() + assert(in.where("c3 > 500").count() == 500) + sql("DROP TABLE IF EXISTS carbon4") } + test("read and write using CarbonContext, multiple load") { + sql("DROP TABLE IF EXISTS carbon4") + df.write + .format("carbondata") + .option("tableName", "carbon4") + .mode(SaveMode.Overwrite) + .save() + df.write + .format("carbondata") + .option("tableName", "carbon4") + .mode(SaveMode.Append) + .save() + val in = read + .format("carbondata") + .option("tableName", "carbon4") + .load() + assert(in.where("c3 > 500").count() == 1000) + sql("DROP TABLE IF EXISTS carbon4") + } + test("query using SQLContext") { val sqlContext = new SQLContext(sparkContext) sqlContext.sql( http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala index d4def47..cc00c47 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala @@ -21,11 +21,10 @@ package org.apache.carbondata.spark.testsuite.allqueries import java.io.File -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -1091,10 +1090,13 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll { test("CARBONDATA-60-union-defect")({ sql("drop table if exists carbonunion") import implicits._ - val df=sc.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1", "c2") + val df = sc.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1", "c2") df.registerTempTable("sparkunion") - import org.apache.carbondata.spark._ - df.saveAsCarbonFile(Map("tableName" -> "carbonunion")) + df.write + .format("carbondata") + .mode(SaveMode.Overwrite) + .option("tableName", "carbonunion") + .save() checkAnswer( sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from carbonunion union all select c2 as c1,c1 as c2 from carbonunion)t where c1='200' group by c1"),