This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 181c1b3  [CARBONDATA-3887] Fixed insert failure for global sort null 
data
181c1b3 is described below

commit 181c1b3c8d20678db9fd205f8afd376c32d3fb7d
Author: kunal642 <kunalkapoor...@gmail.com>
AuthorDate: Fri Jul 3 13:31:05 2020 +0530

    [CARBONDATA-3887] Fixed insert failure for global sort null data
    
    Why is this PR needed?
    Load data is failing with "Unsupported dataType: String" exception when 
null data is loaded into a string column in global_sort table as there is No 
handling for null data for string column
    
    What changes were proposed in this PR?
    Added a check for null data and handle for the same in global sort flow.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3822
---
 .../apache/carbondata/spark/util/CommonUtil.scala  | 93 +++++++++++-----------
 .../dataload/TestGlobalSortDataLoad.scala          | 12 +++
 2 files changed, 60 insertions(+), 45 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7e98605..dcbc9d2 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -921,52 +921,55 @@ object CommonUtil {
     var i = 0
     val fieldTypesLen = fields.length
     while (i < fieldTypesLen) {
-      if (!row.isNullAt(i)) {
-        fields(i).dataType match {
-          case StringType =>
-            data(i) = 
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+      fields(i).dataType match {
+        case StringType =>
+          data(i) = if (row.isNullAt(i)) {
+            DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(null,
               DataTypes.STRING)
-          case d: DecimalType =>
-            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-          case arrayType : ArrayType =>
-            val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
arrayType), arrayType)
-            // convert carbon complex object to byte array
-            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
-            val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
-            
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
-              .writeByteArray(result.asInstanceOf[ArrayObject],
-                dataOutputStream,
-                badRecordLogHolder,
-                true)
-            dataOutputStream.close()
-            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
-          case structType : StructType =>
-            val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
structType), structType)
-            // convert carbon complex object to byte array
-            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
-            val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
-            
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
-              .writeByteArray(result.asInstanceOf[StructObject],
-                dataOutputStream,
-                badRecordLogHolder,
-                true)
-            dataOutputStream.close()
-            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
-          case mapType : MapType =>
-            val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
mapType), mapType)
-            // convert carbon complex object to byte array
-            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
-            val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
-            
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
-              .writeByteArray(result.asInstanceOf[ArrayObject],
-                dataOutputStream,
-                badRecordLogHolder,
-                true)
-            dataOutputStream.close()
-            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
-          case other =>
-            data(i) = row.get(i, other)
-        }
+          } else {
+            
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+              DataTypes.STRING)
+          }
+        case d: DecimalType =>
+          data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+        case arrayType: ArrayType =>
+          val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
arrayType), arrayType)
+          // convert carbon complex object to byte array
+          val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+          val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+          
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+            .writeByteArray(result.asInstanceOf[ArrayObject],
+              dataOutputStream,
+              badRecordLogHolder,
+              true)
+          dataOutputStream.close()
+          data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+        case structType: StructType =>
+          val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
structType), structType)
+          // convert carbon complex object to byte array
+          val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+          val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+          
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+            .writeByteArray(result.asInstanceOf[StructObject],
+              dataOutputStream,
+              badRecordLogHolder,
+              true)
+          dataOutputStream.close()
+          data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+        case mapType: MapType =>
+          val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
mapType), mapType)
+          // convert carbon complex object to byte array
+          val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+          val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+          
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+            .writeByteArray(result.asInstanceOf[ArrayObject],
+              dataOutputStream,
+              badRecordLogHolder,
+              true)
+          dataOutputStream.close()
+          data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+        case other =>
+          data(i) = row.get(i, other)
       }
       i += 1
     }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index a228e21..f1851f1 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -89,6 +89,8 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
     sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
     sql("DROP TABLE IF EXISTS carbon_globalsort_major")
     sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
+    sql("drop table if exists source")
+    sql("drop table if exists sink")
   }
 
   // ----------------------------------- Compare Result 
-----------------------------------
@@ -468,6 +470,16 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
       sql("SELECT * FROM carbon_localsort_difftypes ORDER BY shortField"))
   }
 
+  test("test global sort with null values") {
+    sql("drop table if exists source")
+    sql("drop table if exists sink")
+    sql("create table source(a string, b int, c int, d int, e int, f int) 
stored as carbondata TBLPROPERTIES('bad_record_action'='force')")
+    sql("insert into source select 'k','k', 'k','k','k', 'k'")
+    sql("create table sink (a string, b string, c int, d bigint, e double, f 
char(5)) stored as carbondata TBLPROPERTIES('sort_scope'='global_sort', 
'sort_columns'='b,c,d,f')")
+    sql("insert into sink select * from source")
+    checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, 
null))
+  }
+
   private def resetConf() {
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)

Reply via email to