This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f516b  [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception 
in load from CSV flow with binary non-sort columns
00f516b is described below

commit 00f516bbfaa58efa5f2e7914cfe01bddb0bcc812
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Sat Dec 28 16:54:06 2019 +0800

    [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from 
CSV flow with binary non-sort columns
    
    Problem:
    Global sort throws exception in load from CSV flow with binary non-sort 
columns.
    Exception is attached in JIRA and test case is added in PR.
    
    Cause: For global sort flow, we make csvRDD from file. But again this 
String RDD is converted to new string scala objects, here binary was exceeding 
32000 length. Hence the exception
    
    Solution: For CSV load flow, avoid this new String object conversion as it 
is already string object.
    This can also handle [CARBONDATA-3638]
    
    This closes #3540
---
 .../testsuite/binary/TestBinaryDataType.scala      | 42 ++++++++++++++++++++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 25 ++++++++++---
 .../spark/load/DataLoadProcessorStepOnSpark.scala  | 29 +++++++++++++++
 3 files changed, 91 insertions(+), 5 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 7550581..84675fc 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -112,6 +112,48 @@ class TestBinaryDataType extends QueryTest with 
BeforeAndAfterAll {
         }
     }
 
+    test("Create table and load data with binary column with other global sort 
columns") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED AS CARBONDATA
+               | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE' = 
'global_sort')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        try {
+            val df = sql("SELECT * FROM binaryTable").collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || 
(each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+                val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40)
+                val binaryName = each(2).toString
+                val expectedBytes = 
Hex.encodeHex(firstBytes20.get(binaryName).get)
+                assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), 
bytes40), "incorrect value for binary data")
+                assert(each(4).toString.equalsIgnoreCase("false") || 
(each(4).toString.equalsIgnoreCase("true")))
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
     private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71, 
13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74),
         "2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 
73, 72, 68, 82, 0, 0, 2, -11),
         "3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 
73, 72, 68, 82, 0, 0, 1, 54)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index bb5e946..ae859c0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -52,7 +52,7 @@ import 
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.store.CarbonRowReadSupport
 
@@ -67,10 +67,12 @@ object DataLoadProcessBuilderOnSpark {
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
       hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+    var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
       dataFrame.get.rdd
     } else {
       // input data from files
+      isLoadFromCSV = true
       val columnCount = model.getCsvHeaderColumns.length
       CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
@@ -90,11 +92,24 @@ object DataLoadProcessBuilderOnSpark {
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
-    val inputRDD = originRDD
-      .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, 
modelBroadcast))
-      .mapPartitionsWithIndex { case (index, rows) =>
-        DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+    val inputRDD = if (isLoadFromCSV) {
+      // No need of wrap with NewRDDIterator, which converts object to string,
+      // as it is already a string.
+      // So, this will avoid new object creation in case of CSV global sort 
load for each row
+      originRDD.mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark.inputFuncForCsvRows(
+          rows.asInstanceOf[Iterator[StringArrayRow]],
+          index,
+          modelBroadcast,
+          inputStepRowCounter)
       }
+    } else {
+      originRDD
+        .mapPartitions(rows => 
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
+        .mapPartitionsWithIndex { case (index, rows) =>
+          DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+        }
+    }
 
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index ff0e1bf..041019a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -97,6 +97,35 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
+  def inputFuncForCsvRows(
+      rows: Iterator[StringArrayRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val rowParser = new RowParserImpl(conf.getDataFields, conf)
+    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) 
=>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        val rawRow = rows.next().values.asInstanceOf[Array[Object]]
+        val row = if (isRawDataRequired) {
+          new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+        } else {
+          new CarbonRow(rowParser.parseRow(rawRow))
+        }
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
   def internalInputFunc(
       rows: Iterator[InternalRow],
       index: Int,

Reply via email to