Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 70c1015e4 -> 14595bf69


support append in dataframe

fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4808378d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4808378d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4808378d

Branch: refs/heads/master
Commit: 4808378df44d7e04aed9b51258c966d8d26d9278
Parents: 70c1015
Author: jackylk <jacky.li...@huawei.com>
Authored: Sat Oct 15 05:21:09 2016 +0800
Committer: jackylk <jacky.li...@huawei.com>
Committed: Sat Oct 15 05:29:29 2016 +0800

----------------------------------------------------------------------
 .../examples/DataFrameAPIExample.scala          |   9 +-
 .../carbondata/examples/util/ExampleUtils.scala |  33 ++--
 .../spark/CarbonDataFrameWriter.scala           | 165 +++++++++++++++++++
 .../apache/carbondata/spark/CarbonOption.scala  |   4 +-
 .../spark/implicit/DataFrameFuncs.scala         | 159 ------------------
 .../org/apache/carbondata/spark/package.scala   |  27 ---
 .../spark/sql/CarbonDatasourceRelation.scala    |   6 +-
 .../dataload/SparkDatasourceSuite.scala         |  43 ++++-
 .../AllDataTypesTestCaseAggregate.scala         |  12 +-
 9 files changed, 245 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
 
b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 2d9193f..49fb0da 100644
--- 
a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ 
b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -33,8 +33,13 @@ object DataFrameAPIExample {
       .load()
 
     import cc.implicits._
-    val count = in.where($"c3" > 500).select($"*").count()
-    println(s"count using dataframe: $count")
+    var count = in.where($"c3" > 500).select($"*").count()
+    println(s"count after 1 load: $count")
+
+    // append new data, query answer should be 1000
+    ExampleUtils.appendSampleCarbonFile(cc, "carbon1")
+    count = in.where($"c3" > 500).select($"*").count()
+    println(s"count after 2 load: $count")
 
     // use SQL to read
     cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
 
b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
index 0126c66..cfcdde8 100644
--- 
a/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ 
b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
@@ -55,24 +55,37 @@ object ExampleUtils {
    * This func will write a sample CarbonData file containing following schema:
    * c1: String, c2: String, c3: Double
    */
-  def writeSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = {
+  def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int 
= 1000): Unit = {
+    cc.sql(s"DROP TABLE IF EXISTS $tableName")
+    writeDataframe(cc, tableName, numRows, SaveMode.Overwrite)
+  }
+
+  /**
+   * This func will append data to the CarbonData file
+   */
+  def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: 
Int = 1000): Unit = {
+    writeDataframe(cc, tableName, numRows, SaveMode.Append)
+  }
+
+  /**
+   * create a new dataframe and write to CarbonData file, based on save mode
+   */
+  private def writeDataframe(
+      cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): 
Unit = {
     // use CarbonContext to write CarbonData files
     import cc.implicits._
     val sc = cc.sparkContext
-    // create a dataframe, it can be from parquet or hive table
-    val df = sc.parallelize(1 to 1000, 2)
+    val df = sc.parallelize(1 to numRows, 2)
         .map(x => ("a", "b", x))
         .toDF("c1", "c2", "c3")
 
-    cc.sql(s"DROP TABLE IF EXISTS $tableName")
-
     // save dataframe to carbon file
     df.write
-      .format("carbondata")
-      .option("tableName", tableName)
-      .option("compress", "true")
-      .mode(SaveMode.Overwrite)
-      .save()
+        .format("carbondata")
+        .option("tableName", tableName)
+        .option("compress", "true")
+        .mode(mode)
+        .save()
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
new file mode 100644
index 0000000..65ff787
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => 
CarbonType}
+
+class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
+
+  def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    checkContext()
+    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
+
+    // create a new table using dataframe's schema and write its content into 
the table
+    cc.sql(makeCreateTableString(dataFrame.schema, new 
CarbonOption(parameters)))
+    writeToCarbonFile(parameters)
+  }
+
+  def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    // append the data as a new load
+    checkContext()
+    writeToCarbonFile(parameters)
+  }
+
+  private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit 
= {
+    val options = new CarbonOption(parameters)
+    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
+    if (options.tempCSV) {
+      loadTempCSV(options, cc)
+    } else {
+      loadDataFrame(options, cc)
+    }
+  }
+
+  /**
+   * Firstly, saving DataFrame to CSV files
+   * Secondly, load CSV files
+   * @param options
+   * @param cc
+   */
+  private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
+    // temporary solution: write to csv file, then load the csv into carbon
+    val tempCSVFolder = "./tempCSV"
+    writeToTempCSVFile(tempCSVFolder, options)
+
+    val tempCSVPath = new Path(tempCSVFolder)
+    val fs = 
tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
+
+    def countSize(): Double = {
+      var size: Double = 0
+      val itor = fs.listFiles(tempCSVPath, true)
+      while (itor.hasNext) {
+        val f = itor.next()
+        if (f.getPath.getName.startsWith("part-")) {
+          size += f.getLen
+        }
+      }
+      size
+    }
+
+    logInfo(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
+
+    try {
+      cc.sql(makeLoadString(tempCSVFolder, options))
+    } finally {
+      fs.delete(tempCSVPath, true)
+    }
+  }
+
+  private def checkContext(): Unit = {
+    // To avoid derby problem, dataframe need to be writen and read using 
CarbonContext
+    require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
+      "Error in saving dataframe to carbon file, must use CarbonContext to 
save dataframe"
+    )
+  }
+
+  private def writeToTempCSVFile(tempCSVFolder: String, options: 
CarbonOption): Unit = {
+    var writer: DataFrameWriter =
+      dataFrame.write
+        .format(csvPackage)
+        .option("header", "false")
+        .mode(SaveMode.Overwrite)
+
+    if (options.compress) {
+      writer = writer.option("codec", "gzip")
+    }
+
+    writer.save(tempCSVFolder)
+  }
+
+  /**
+   * Loading DataFrame directly without saving DataFrame to CSV files.
+   * @param options
+   * @param cc
+   */
+  private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
+    val header = dataFrame.columns.mkString(",")
+    LoadTable(
+      Some(options.dbName),
+      options.tableName,
+      null,
+      Seq(),
+      Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(dataFrame)).run(cc)
+  }
+
+  private def csvPackage: String = "com.databricks.spark.csv.newapi"
+
+  private def convertToCarbonType(sparkType: DataType): String = {
+    sparkType match {
+      case StringType => CarbonType.STRING.name
+      case IntegerType => CarbonType.INT.name
+      case ByteType => CarbonType.INT.name
+      case ShortType => CarbonType.SHORT.name
+      case LongType => CarbonType.LONG.name
+      case FloatType => CarbonType.DOUBLE.name
+      case DoubleType => CarbonType.DOUBLE.name
+      case BooleanType => CarbonType.DOUBLE.name
+      case TimestampType => CarbonType.TIMESTAMP.name
+      case other => sys.error(s"unsupported type: $other")
+    }
+  }
+
+  private def makeCreateTableString(schema: StructType, options: 
CarbonOption): String = {
+    val carbonSchema = schema.map { field =>
+      s"${ field.name } ${ convertToCarbonType(field.dataType) }"
+    }
+    s"""
+          CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+          (${ carbonSchema.mkString(", ") })
+          STORED BY '${ CarbonContext.datasourceName }'
+      """
+  }
+
+  private def makeLoadString(csvFolder: String, options: CarbonOption): String 
= {
+    s"""
+          LOAD DATA INPATH '$csvFolder'
+          INTO TABLE ${options.dbName}.${options.tableName}
+          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
+      """
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index b93841f..71b18b9 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -38,8 +38,8 @@ class CarbonOption(options: Map[String, String]) {
       
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
   }
 
-  def tempCSV: String = options.getOrElse("tempCSV", "true")
+  def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
 
-  def compress: String = options.getOrElse("compress", "false")
+  def compress: Boolean = options.getOrElse("compress", "false").toBoolean
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
deleted file mode 100644
index 8585e07..0000000
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.LoadTable
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => 
CarbonType}
-
-class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
-
-  /**
-   * Saves DataFrame as CarbonData files.
-   */
-  def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-    // To avoid derby problem, dataframe need to be writen and read using 
CarbonContext
-    require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
-      "Error in saving dataframe to carbon file, must use CarbonContext to 
save dataframe"
-    )
-
-    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
-    val options = new CarbonOption(parameters)
-    cc.sql(makeCreateTableString(dataFrame.schema, options))
-
-    if (options.tempCSV.equals("true")) {
-      loadTempCSV(options, cc)
-    } else {
-      loadDataFrame(options, cc)
-    }
-  }
-
-  /**
-   * Firstly, saving DataFrame to CSV files
-   * Secondly, load CSV files
-   * @param options
-   * @param cc
-   */
-  private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
-    // temporary solution: write to csv file, then load the csv into carbon
-    val tempCSVFolder = s"./tempCSV"
-    var writer: DataFrameWriter =
-      dataFrame.write
-        .format(csvPackage)
-        .option("header", "false")
-        .mode(SaveMode.Overwrite)
-
-    if (options.compress.equals("true")) {
-      writer = writer.option("codec", "gzip")
-    }
-
-    writer.save(tempCSVFolder)
-
-    val tempCSVPath = new Path(tempCSVFolder)
-    val fs = 
tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
-    def countSize(): Double = {
-      var size: Double = 0
-      val itor = fs.listFiles(tempCSVPath, true)
-      while (itor.hasNext) {
-        val f = itor.next()
-        if (f.getPath.getName.startsWith("part-")) {
-          size += f.getLen
-        }
-      }
-      size
-    }
-
-    try {
-      logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
-      cc.sql(makeLoadString(options.tableName, tempCSVFolder))
-    } finally {
-      fs.delete(tempCSVPath, true)
-    }
-  }
-
-  /**
-   * Loading DataFrame directly without saving DataFrame to CSV files.
-   * @param options
-   * @param cc
-   */
-  private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
-    val header = dataFrame.columns.mkString(",")
-    LoadTable(
-      Some(options.dbName),
-      options.tableName,
-      null,
-      Seq(),
-      Map(("fileheader" -> header)),
-      false,
-      null,
-      Some(dataFrame)).run(cc)
-  }
-
-  private def csvPackage: String = "com.databricks.spark.csv.newapi"
-
-  private def convertToCarbonType(sparkType: DataType): String = {
-    sparkType match {
-      case StringType => CarbonType.STRING.name
-      case IntegerType => CarbonType.INT.name
-      case ByteType => CarbonType.INT.name
-      case ShortType => CarbonType.SHORT.name
-      case LongType => CarbonType.LONG.name
-      case FloatType => CarbonType.DOUBLE.name
-      case DoubleType => CarbonType.DOUBLE.name
-      case BooleanType => CarbonType.DOUBLE.name
-      case TimestampType => CarbonType.TIMESTAMP.name
-      case other => sys.error(s"unsupported type: $other")
-    }
-  }
-
-  private def makeCreateTableString(schema: StructType, option: CarbonOption): 
String = {
-    val tableName = option.tableName
-    val dbName = option.dbName
-    val carbonSchema = schema.map { field =>
-      s"${ field.name } ${ convertToCarbonType(field.dataType) }"
-    }
-    s"""
-          CREATE TABLE IF NOT EXISTS $dbName.$tableName
-          (${ carbonSchema.mkString(", ") })
-          STORED BY '${ CarbonContext.datasourceName }'
-      """
-  }
-
-  private def makeLoadString(tableName: String, csvFolder: String): String = {
-    s"""
-          LOAD DATA INPATH '$csvFolder'
-          INTO TABLE $tableName
-          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
-      """
-  }
-
-  def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-    // find out table
-    // find out streaming segment
-    // for each rdd partition, find out the appendable carbon file
-    // check whether it is full
-    // if full, create new file
-    // append to it: create blocklet header and data, call thrift to convert, 
write hdfs
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
deleted file mode 100644
index b1b91e7..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata
-
-import org.apache.spark.sql.DataFrame
-
-package object spark {
-
-  implicit def toDataFrameFuncs(dataFrame: DataFrame): DataFrameFuncs = {
-    new DataFrameFuncs(dataFrame)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index ef0e497..3441777 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -106,10 +106,10 @@ class CarbonSource extends RelationProvider
     }
 
     if (doSave) {
-      // Only save data when the save mode is Overwrite.
-      data.saveAsCarbonFile(parameters)
+      // save data when the save mode is Overwrite.
+      new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters)
     } else if (doAppend) {
-      data.appendToCarbonFile(parameters)
+      new CarbonDataFrameWriter(data).appendToCarbonFile(parameters)
     }
 
     createRelation(sqlContext, parameters)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
index ebcaf75..1da0533 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -68,13 +68,46 @@ class SparkDatasourceSuite extends QueryTest with 
BeforeAndAfterAll {
     assert(in.where("c3 > 500").count() == 500)
   }
 
-  test("saveAsCarbon API") {
-    import org.apache.carbondata.spark._
-    df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
-
-    checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), 
Seq(Row(900)))
+  test("test overwrite") {
+    sql("DROP TABLE IF EXISTS carbon4")
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .mode(SaveMode.Overwrite)
+        .save()
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .mode(SaveMode.Overwrite)
+        .save()
+    val in = read
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .load()
+    assert(in.where("c3 > 500").count() == 500)
+    sql("DROP TABLE IF EXISTS carbon4")
   }
 
+  test("read and write using CarbonContext, multiple load") {
+    sql("DROP TABLE IF EXISTS carbon4")
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .mode(SaveMode.Overwrite)
+        .save()
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .mode(SaveMode.Append)
+        .save()
+    val in = read
+        .format("carbondata")
+        .option("tableName", "carbon4")
+        .load()
+    assert(in.where("c3 > 500").count() == 1000)
+    sql("DROP TABLE IF EXISTS carbon4")
+  }
+  
   test("query using SQLContext") {
     val sqlContext = new SQLContext(sparkContext)
     sqlContext.sql(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4808378d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index d4def47..cc00c47 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -21,11 +21,10 @@ package org.apache.carbondata.spark.testsuite.allqueries
 
 import java.io.File
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -1091,10 +1090,13 @@ class AllDataTypesTestCaseAggregate extends QueryTest 
with BeforeAndAfterAll {
   test("CARBONDATA-60-union-defect")({
     sql("drop table if exists carbonunion")
     import implicits._
-    val df=sc.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1", 
"c2")
+    val df = sc.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1", 
"c2")
     df.registerTempTable("sparkunion")
-    import org.apache.carbondata.spark._
-    df.saveAsCarbonFile(Map("tableName" -> "carbonunion"))
+    df.write
+      .format("carbondata")
+      .mode(SaveMode.Overwrite)
+      .option("tableName", "carbonunion")
+      .save()
 
     checkAnswer(
       sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from carbonunion 
union all select c2 as c1,c1 as c2 from carbonunion)t where c1='200' group by 
c1"),

Reply via email to