This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 00f516b [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns 00f516b is described below commit 00f516bbfaa58efa5f2e7914cfe01bddb0bcc812 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Sat Dec 28 16:54:06 2019 +0800 [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns Problem: Global sort throws exception in load from CSV flow with binary non-sort columns. Exception is attached in JIRA and test case is added in PR. Cause: For global sort flow, we make csvRDD from file. But again this String RDD is converted to new string scala objects, here binary was exceeding 32000 length. Hence the exception Solution: For CSV load flow, avoid this new String object conversion as it is already string object. This can also handle [CARBONDATA-3638] This closes #3540 --- .../testsuite/binary/TestBinaryDataType.scala | 42 ++++++++++++++++++++++ .../spark/load/DataLoadProcessBuilderOnSpark.scala | 25 ++++++++++--- .../spark/load/DataLoadProcessorStepOnSpark.scala | 29 +++++++++++++++ 3 files changed, 91 insertions(+), 5 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala index 7550581..84675fc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala @@ -112,6 +112,48 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { } } + test("Create table and load data with binary column with other global sort columns") { + sql("DROP TABLE IF EXISTS binaryTable") + sql( + s""" + | CREATE TABLE IF NOT EXISTS binaryTable ( + | id int, + | label boolean, + | name string, + | binaryField binary, + | autoLabel boolean) + | STORED AS CARBONDATA + | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE' = 'global_sort') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' + | INTO TABLE binaryTable + | OPTIONS('header'='false') + """.stripMargin) + + checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) + try { + val df = sql("SELECT * FROM binaryTable").collect() + assert(3 == df.length) + df.foreach { each => + assert(5 == each.length) + assert(Integer.valueOf(each(0).toString) > 0) + assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) + assert(each(2).toString.contains(".png")) + val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40) + val binaryName = each(2).toString + val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get) + assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect value for binary data") + assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) + } + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } + } + private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74), "2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 2, -11), "3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 54) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index bb5e946..ae859c0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -52,7 +52,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant} -import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow} import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.store.CarbonRowReadSupport @@ -67,10 +67,12 @@ object DataLoadProcessBuilderOnSpark { dataFrame: Option[DataFrame], model: CarbonLoadModel, hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + var isLoadFromCSV = false val originRDD = if (dataFrame.isDefined) { dataFrame.get.rdd } else { // input data from files + isLoadFromCSV = true val columnCount = model.getCsvHeaderColumns.length CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) @@ -90,11 +92,24 @@ object DataLoadProcessBuilderOnSpark { val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) // 1. Input - val inputRDD = originRDD - .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) - .mapPartitionsWithIndex { case (index, rows) => - DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter) + val inputRDD = if (isLoadFromCSV) { + // No need of wrap with NewRDDIterator, which converts object to string, + // as it is already a string. + // So, this will avoid new object creation in case of CSV global sort load for each row + originRDD.mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.inputFuncForCsvRows( + rows.asInstanceOf[Iterator[StringArrayRow]], + index, + modelBroadcast, + inputStepRowCounter) } + } else { + originRDD + .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + } // 2. Convert val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index ff0e1bf..041019a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -97,6 +97,35 @@ object DataLoadProcessorStepOnSpark { } } + def inputFuncForCsvRows( + rows: Iterator[StringArrayRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + val rawRow = rows.next().values.asInstanceOf[Array[Object]] + val row = if (isRawDataRequired) { + new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + new CarbonRow(rowParser.parseRow(rawRow)) + } + rowCounter.add(1) + row + } + } + } + def internalInputFunc( rows: Iterator[InternalRow], index: Int,