[CARBONDATA-2110]deprecate 'tempCSV' option of dataframe load deprecate 'tempCSV' option of dataframe load, it won't generate temp file on hdfs, no matter the value of tempCSV
This closes #1916 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da129d52 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da129d52 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da129d52 Branch: refs/heads/branch-1.3 Commit: da129d5277babe498fa5686fe53d01433d112bab Parents: 6c097cb Author: qiuchenjian <807169...@qq.com> Authored: Sat Feb 3 00:14:07 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sat Feb 3 15:29:08 2018 +0800 ---------------------------------------------------------------------- .../testsuite/dataload/TestLoadDataFrame.scala | 19 ++++ .../spark/sql/CarbonDataFrameWriter.scala | 98 +------------------- 2 files changed, 20 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index 6f03493..693c145 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -29,6 +29,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { var df: DataFrame = _ var dataFrame: DataFrame = _ var df2: DataFrame = _ + var df3: DataFrame = _ var booldf:DataFrame = _ @@ -52,6 +53,10 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { .map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3)) .toDF("c1", "c2", "c3", "c4", "c5") + df3 = sqlContext.sparkContext.parallelize(1 to 3) + .map(x => (x.toString + "te,s\nt", x)) + .toDF("c1", "c2") + val boolrdd = sqlContext.sparkContext.parallelize( Row("anubhav",true) :: Row("prince",false) :: Nil) @@ -74,6 +79,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS carbon9") sql("DROP TABLE IF EXISTS carbon10") sql("DROP TABLE IF EXISTS carbon11") + sql("DROP TABLE IF EXISTS carbon12") sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified") sql("DROP TABLE IF EXISTS df_write_specify_sort_column") sql("DROP TABLE IF EXISTS df_write_empty_sort_column") @@ -261,6 +267,19 @@ test("test the boolean data type"){ val isStreaming: String = descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String] assert(isStreaming.contains("true")) } + + test("test datasource table with specified char") { + + df3.write + .format("carbondata") + .option("tableName", "carbon12") + .option("tempCSV", "true") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon12"), Row(3) + ) + } private def getSortColumnValue(tableName: String): Array[String] = { val desc = sql(s"desc formatted $tableName") val sortColumnRow = desc.collect.find(r => http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 2b06375..2be89b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType} -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonOption class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { @@ -46,90 +42,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = { val options = new CarbonOption(parameters) - if (options.tempCSV) { - loadTempCSV(options) - } else { - loadDataFrame(options) - } + loadDataFrame(options) } - - /** - * Firstly, saving DataFrame to CSV files - * Secondly, load CSV files - * @param options - */ - private def loadTempCSV(options: CarbonOption): Unit = { - // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonProperties.getStorePath - val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) - .append("tempCSV") - .append(CarbonCommonConstants.UNDERSCORE) - .append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)) - .append(CarbonCommonConstants.UNDERSCORE) - .append(options.tableName) - .append(CarbonCommonConstants.UNDERSCORE) - .append(System.nanoTime()) - .toString - writeToTempCSVFile(tempCSVFolder, options) - - val tempCSVPath = new Path(tempCSVFolder) - val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration) - - def countSize(): Double = { - var size: Double = 0 - val itor = fs.listFiles(tempCSVPath, true) - while (itor.hasNext) { - val f = itor.next() - if (f.getPath.getName.startsWith("part-")) { - size += f.getLen - } - } - size - } - - LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB") - - try { - sqlContext.sql(makeLoadString(tempCSVFolder, options)) - } finally { - fs.delete(tempCSVPath, true) - } - } - - private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = { - val strRDD = dataFrame.rdd.mapPartitions { case iter => - new Iterator[String] { - override def hasNext = iter.hasNext - - def convertToCSVString(seq: Seq[Any]): String = { - val build = new java.lang.StringBuilder() - if (seq.head != null) { - build.append(seq.head.toString) - } - val itemIter = seq.tail.iterator - while (itemIter.hasNext) { - build.append(CarbonCommonConstants.COMMA) - val value = itemIter.next() - if (value != null) { - build.append(value.toString) - } - } - build.toString - } - - override def next: String = { - convertToCSVString(iter.next.toSeq) - } - } - } - - if (options.compress) { - strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec]) - } else { - strRDD.saveAsTextFile(tempCSVFolder) - } - } - /** * Loading DataFrame directly without saving DataFrame to CSV files. * @param options @@ -189,14 +103,4 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { """.stripMargin } - private def makeLoadString(csvFolder: String, options: CarbonOption): String = { - val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession) - s""" - | LOAD DATA INPATH '$csvFolder' - | INTO TABLE $dbName.${options.tableName} - | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}', - | 'SINGLE_PASS' = '${options.singlePass}') - """.stripMargin - } - }