This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 181c1b3 [CARBONDATA-3887] Fixed insert failure for global sort null data 181c1b3 is described below commit 181c1b3c8d20678db9fd205f8afd376c32d3fb7d Author: kunal642 <kunalkapoor...@gmail.com> AuthorDate: Fri Jul 3 13:31:05 2020 +0530 [CARBONDATA-3887] Fixed insert failure for global sort null data Why is this PR needed? Load data is failing with "Unsupported dataType: String" exception when null data is loaded into a string column in global_sort table as there is No handling for null data for string column What changes were proposed in this PR? Added a check for null data and handle for the same in global sort flow. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3822 --- .../apache/carbondata/spark/util/CommonUtil.scala | 93 +++++++++++----------- .../dataload/TestGlobalSortDataLoad.scala | 12 +++ 2 files changed, 60 insertions(+), 45 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 7e98605..dcbc9d2 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -921,52 +921,55 @@ object CommonUtil { var i = 0 val fieldTypesLen = fields.length while (i < fieldTypesLen) { - if (!row.isNullAt(i)) { - fields(i).dataType match { - case StringType => - data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i), + fields(i).dataType match { + case StringType => + data(i) = if (row.isNullAt(i)) { + DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(null, DataTypes.STRING) - case d: DecimalType => - data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal - case arrayType : ArrayType => - val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType) - // convert carbon complex object to byte array - val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() - val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) - dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] - .writeByteArray(result.asInstanceOf[ArrayObject], - dataOutputStream, - badRecordLogHolder, - true) - dataOutputStream.close() - data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] - case structType : StructType => - val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType) - // convert carbon complex object to byte array - val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() - val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) - dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType] - .writeByteArray(result.asInstanceOf[StructObject], - dataOutputStream, - badRecordLogHolder, - true) - dataOutputStream.close() - data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] - case mapType : MapType => - val result = convertSparkComplexTypeToCarbonObject(row.get(i, mapType), mapType) - // convert carbon complex object to byte array - val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() - val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) - dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] - .writeByteArray(result.asInstanceOf[ArrayObject], - dataOutputStream, - badRecordLogHolder, - true) - dataOutputStream.close() - data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] - case other => - data(i) = row.get(i, other) - } + } else { + DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i), + DataTypes.STRING) + } + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType: ArrayType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder, + true) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case structType: StructType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType] + .writeByteArray(result.asInstanceOf[StructObject], + dataOutputStream, + badRecordLogHolder, + true) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case mapType: MapType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, mapType), mapType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder, + true) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case other => + data(i) = row.get(i, other) } i += 1 } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index a228e21..f1851f1 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -89,6 +89,8 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("DROP TABLE IF EXISTS carbon_globalsort_minor") sql("DROP TABLE IF EXISTS carbon_globalsort_major") sql("DROP TABLE IF EXISTS carbon_globalsort_custom") + sql("drop table if exists source") + sql("drop table if exists sink") } // ----------------------------------- Compare Result ----------------------------------- @@ -468,6 +470,16 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_difftypes ORDER BY shortField")) } + test("test global sort with null values") { + sql("drop table if exists source") + sql("drop table if exists sink") + sql("create table source(a string, b int, c int, d int, e int, f int) stored as carbondata TBLPROPERTIES('bad_record_action'='force')") + sql("insert into source select 'k','k', 'k','k','k', 'k'") + sql("create table sink (a string, b string, c int, d bigint, e double, f char(5)) stored as carbondata TBLPROPERTIES('sort_scope'='global_sort', 'sort_columns'='b,c,d,f')") + sql("insert into sink select * from source") + checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, null)) + } + private def resetConf() { CarbonProperties.getInstance() .removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)