Repository: carbondata Updated Branches: refs/heads/master 682cac85a -> c7c83684b
[CARBONDATA-3028][32k] Fix bugs in spark file format table with blanks in longstringcolumns If we create a spark file format table with multiple longstringcolumns and the option long_string_columns contains blank characters, the query on that table will fail, cause it didn't recognize the correct varchar columns. The root cause is that carbondata didn't trim the blank in long_string_columns while it recognizing the varchar columns. This closes #2834 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c7c83684 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c7c83684 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c7c83684 Branch: refs/heads/master Commit: c7c83684b0237e3eec9d6c3af904293e207e733a Parents: 682cac8 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Fri Oct 19 10:37:23 2018 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Tue Oct 23 16:32:24 2018 +0800 ---------------------------------------------------------------------- ...tCreateTableUsingSparkCarbonFileFormat.scala | 38 ++++++++++---------- .../sdk/file/CarbonWriterBuilder.java | 25 +++++++------ 2 files changed, 33 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7c83684/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala index 755a7df..250e9a6 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File import java.text.SimpleDateFormat -import java.util import java.util.{Date, Random} import scala.collection.JavaConverters._ @@ -30,17 +29,14 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.util.SparkUtil import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants} import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} -import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.Row -import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.blocklet.DataFileFooter @@ -433,14 +429,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA .append("[ \n") .append(" {\"name\":\"string\"},\n") .append(" {\"address\":\"varchar\"},\n") - .append(" {\"age\":\"int\"}\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"note\":\"varchar\"}\n") .append("]") .toString() val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build() - for (i <- 0 until 3) { + val totalRecordsNum = 3 + for (i <- 0 until totalRecordsNum) { // write a varchar with 75,000 length - writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString)) + writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000))) } writer.close() @@ -449,19 +447,19 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA if (spark.sparkContext.version.startsWith("2.1")) { //data source file format spark.sql( - s"""CREATE TABLE sdkOutputTable (name string, address string, age int) - |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """ + s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) + |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """ .stripMargin) } else { //data source file format spark.sql( - s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon - |OPTIONS("long_String_columns"="address") LOCATION + s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon + |OPTIONS("long_String_columns"="address, note") LOCATION |'$writerPath' """.stripMargin) } - assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1) - val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList() - assert(op.get(0).getString(0).length == 75000) + checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1))) + checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"), + Seq(Row(totalRecordsNum))) spark.sql("DROP TABLE sdkOutputTable") //--------------- data source external table without schema --------------------------- @@ -471,16 +469,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA spark .sql( s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH - |'$writerPath', "long_String_columns" "address") """.stripMargin) + |'$writerPath', "long_String_columns" "address, note") """.stripMargin) } else { //data source file format spark.sql( s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS - |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin) + |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin) } - assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1) - val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList() - assert(op1.get(0).getString(0).length == 75000) + checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1))) + checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"), + Seq(Row(totalRecordsNum))) spark.sql("DROP TABLE sdkOutputTableWithoutSchema") clearDataMapCache cleanTestData() http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7c83684/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 87930f6..ed2c956 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -412,12 +413,17 @@ public class CarbonWriterBuilder { public CarbonLoadModel buildLoadModel(Schema carbonSchema) throws IOException, InvalidLoadOptionException { timestamp = System.nanoTime(); - Set<String> longStringColumns = null; - if (options != null && options.get("long_string_columns") != null) { - longStringColumns = - new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(","))); + // validate long_string_column + Set<String> longStringColumns = new HashSet<>(); + if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) { + String[] specifiedLongStrings = + options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(","); + for (String str : specifiedLongStrings) { + longStringColumns.add(str.trim()); + } validateLongStringColumns(carbonSchema, longStringColumns); } + // for the longstring field, change the datatype from string to varchar this.schema = updateSchemaFields(carbonSchema, longStringColumns); // build CarbonTable using schema CarbonTable table = buildCarbonTable(); @@ -603,12 +609,11 @@ public class CarbonWriterBuilder { for (int i = 0; i < fields.length; i++) { if (fields[i] != null) { fields[i].updateNameToLowerCase(); - } - - if (longStringColumns != null) { - /* Also update the string type to varchar */ - if (longStringColumns.contains(fields[i].getFieldName())) { - fields[i].updateDataTypeToVarchar(); + if (longStringColumns != null) { + /* Also update the string type to varchar */ + if (longStringColumns.contains(fields[i].getFieldName())) { + fields[i].updateDataTypeToVarchar(); + } } } }