This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 2ec9009 [CARBONDATA-3631] StringIndexOutOfBoundsException When Inserting Select From a Parquet Table with Empty array/map 2ec9009 is described below commit 2ec90099595d6e5a7de4106d2f99b8725e4993ef Author: h00424960 <haoxing...@huawei.com> AuthorDate: Mon Dec 30 07:40:13 2019 +0800 [CARBONDATA-3631] StringIndexOutOfBoundsException When Inserting Select From a Parquet Table with Empty array/map Modification reason: (1) StringIndexOutOfBoundsException When Inserting Select From a Parquet Table with Empty array/map. (2) ArrayIndexOutOfBuoundsException When Inserting Select From a Parquet Table with a map with empty key and empty value (3) Result is incorrect when Inserting Select From a Parquet Table with a Struct with Empty String, The result will be null while the correct result is "". Modification content: (1) When the input value is ARRAY(), return EMPTY_DATA_RETURN in the FieldConverter.scala, ArrayParserImpl handle it. (2) When the input value is ARRAY(""), return EMPTY STRING ->"" in the FieldConverter.scala, ArrayParserImpl handle it. (3) When the input value is MAP("",""), return EMPTY STRING ->"" in the FieldConverter.scala, MapParserImpl handle it. (4) When the input value is MAP(), return EMPTY_DATA_RETURN ->"" in the FieldConverter.scala, MapParserImpl handle it. This closes #3545 --- .../core/constants/CarbonCommonConstants.java | 9 +++ .../primitiveTypes/ArrayDataTypeTestCase.scala | 61 ++++++++++++++++++ .../primitiveTypes/MapDataTypeTestCase.scala | 72 ++++++++++++++++++++++ .../loading/parser/impl/ArrayParserImpl.java | 8 ++- .../loading/parser/impl/MapParserImpl.java | 16 ++++- .../streaming/parser/FieldConverter.scala | 48 +++++++++------ 6 files changed, 191 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index fa88027..9ad276c 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -294,6 +294,15 @@ public final class CarbonCommonConstants { public static final String CARBON_SKIP_EMPTY_LINE_DEFAULT = "false"; + + /** + *In the write processing, data needs to be converted to string, where the string forms of + * array("") and array() are EMPTY_STRING, causing confusion about the write results. + * In order to distinguish between array and array(), or map("") and map (), we need to + * identity array("") as the EMPTY_STRING, while array() is the SIZE_ZERO_DATA_RETURN + */ + public static final String SIZE_ZERO_DATA_RETURN = "!LENGTH_ZERO_DATA_RETURN!"; + /** * Currently the segment lock files are not deleted immediately when unlock, * this value indicates the number of hours the segment lock files will be preserved. diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala new file mode 100644 index 0000000..fa5fe94 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.primitiveTypes + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * Test Class for filter query on Float datatypes + */ +class ArrayDataTypeTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("DROP TABLE IF EXISTS datatype_array_parquet") + sql("DROP TABLE IF EXISTS datatype_array_carbondata") + } + + test("test when insert select from a parquet table with an array with empty string") { + sql("create table datatype_array_parquet(col array<string>) stored as parquet") + sql("create table datatype_array_carbondata(col array<string>) stored as carbondata") + sql("insert into datatype_array_parquet values(array(''))") + sql("insert into datatype_array_carbondata select * from datatype_array_parquet") + checkAnswer( + sql("SELECT * FROM datatype_array_carbondata"), + sql("SELECT * FROM datatype_array_parquet")) + sql("DROP TABLE IF EXISTS datatype_array_carbondata") + sql("DROP TABLE IF EXISTS datatype_array_parquet") + } + + test("test when insert select from a parquet table with empty array") { + sql("create table datatype_array_parquet(col array<string>) stored as parquet") + sql("create table datatype_array_carbondata(col array<string>) stored as carbondata") + sql("insert into datatype_array_parquet values(array())") + sql("insert into datatype_array_carbondata select * from datatype_array_parquet") + checkAnswer( + sql("SELECT * FROM datatype_array_carbondata"), + sql("SELECT * FROM datatype_array_parquet")) + sql("DROP TABLE IF EXISTS datatype_array_carbondata") + sql("DROP TABLE IF EXISTS datatype_array_parquet") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS datatype_array_carbondata") + sql("DROP TABLE IF EXISTS datatype_array_parquet") + } +} \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala new file mode 100644 index 0000000..7686e25 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.primitiveTypes + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * Test Class for filter query on Float datatypes + */ +class MapDataTypeTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("DROP TABLE IF EXISTS datatype_map_parquet") + sql("DROP TABLE IF EXISTS datatype_map_carbondata") + } + + test("test when insert select from a parquet table with an map with empty key and value") { + sql("create table datatype_map_parquet(col map<string,string>) stored as parquet") + sql("create table datatype_map_carbondata(col map<string,string>) stored as carbondata") + sql("insert into datatype_map_parquet values(map('',''))") + sql("insert into datatype_map_carbondata select * from datatype_map_parquet") + checkAnswer( + sql("SELECT * FROM datatype_map_carbondata"), + sql("SELECT * FROM datatype_map_parquet")) + sql("DROP TABLE IF EXISTS datatype_map_carbondata") + sql("DROP TABLE IF EXISTS datatype_map_parquet") + } + + test("test when insert select from a parquet table with an map with empty key") { + sql("create table datatype_map_parquet(col map<string,string>) stored as parquet") + sql("create table datatype_map_carbondata(col map<string,string>) stored as carbondata") + sql("insert into datatype_map_parquet values(map('','value'))") + sql("insert into datatype_map_carbondata select * from datatype_map_parquet") + checkAnswer( + sql("SELECT * FROM datatype_map_carbondata"), + sql("SELECT * FROM datatype_map_parquet")) + sql("DROP TABLE IF EXISTS datatype_map_carbondata") + sql("DROP TABLE IF EXISTS datatype_map_parquet") + } + + test("test when insert select from a parquet table with an map with empty value") { + sql("create table datatype_map_parquet(col map<string,string>) stored as parquet") + sql("create table datatype_map_carbondata(col map<string,string>) stored as carbondata") + sql("insert into datatype_map_parquet values(map('key',''))") + sql("insert into datatype_map_carbondata select * from datatype_map_parquet") + checkAnswer( + sql("SELECT * FROM datatype_map_carbondata"), + sql("SELECT * FROM datatype_map_parquet")) + sql("DROP TABLE IF EXISTS datatype_map_carbondata") + sql("DROP TABLE IF EXISTS datatype_map_parquet") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS datatype_map_carbondata") + sql("DROP TABLE IF EXISTS datatype_map_parquet") + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java index 82980fa..ab46137 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java @@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.parser.impl; import java.util.regex.Pattern; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; import org.apache.carbondata.processing.loading.parser.ComplexParser; @@ -48,7 +49,8 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> { public ArrayObject parse(Object data) { if (data != null) { String value = data.toString(); - if (!value.isEmpty() && !value.equals(nullFormat)) { + if (!value.isEmpty() && !value.equals(nullFormat) + && !value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) { String[] split = pattern.split(value, -1); if (ArrayUtils.isNotEmpty(split)) { Object[] array = new Object[split.length]; @@ -61,6 +63,10 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> { Object[] array = new Object[1]; array[0] = child.parse(value); return new ArrayObject(array); + } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) { + // When the data is not array('') but array(), an array with zero size should be returned. + Object[] array = new Object[0]; + return new ArrayObject(array); } } return null; diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java index 806cf31..6e978f0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; import org.apache.commons.lang.ArrayUtils; @@ -40,13 +41,20 @@ public class MapParserImpl extends ArrayParserImpl { public ArrayObject parse(Object data) { if (data != null) { String value = data.toString(); - if (!value.isEmpty() && !value.equals(nullFormat)) { + if (!value.isEmpty() && !value.equals(nullFormat) + // && !value.equals(keyValueDelimiter) + && !value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) { String[] split = pattern.split(value, -1); if (ArrayUtils.isNotEmpty(split)) { ArrayList<Object> array = new ArrayList<>(); Map<Object, String> map = new HashMap<>(); for (int i = 0; i < split.length; i++) { - Object currKey = split[i].split(keyValueDelimiter)[0]; + Object[] splitedKeyAndValue = split[i].split(keyValueDelimiter); + // When both key and value are EMPTY_STRING, the length of the splitted + // result will be 0. Then the currKey should be initialized as a empty object. + // Otherwise, the arrayindexoutexception will be throwed. + Object currKey = splitedKeyAndValue.length > 0 ? split[i].split(keyValueDelimiter)[0] + : new Object(); map.put(currKey, split[i]); } for (Map.Entry<Object, String> entry : map.entrySet()) { @@ -54,6 +62,10 @@ public class MapParserImpl extends ArrayParserImpl { } return new ArrayObject(array.toArray()); } + } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) { + // When the data is not map('','') but map(), an array with zero size should be returned. + Object[] array = new Object[0]; + return new ArrayObject(array); } } return null; diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala index 0cf244a..10f9b40 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala @@ -66,31 +66,39 @@ object FieldConverter { case bs: Array[Byte] => new String(bs, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)) case s: scala.collection.Seq[Any] => - val delimiter = complexDelimiters.get(level) - val builder = new StringBuilder() - s.foreach { x => - val nextLevel = level + 1 - builder.append(objectToString(x, serializationNullFormat, complexDelimiters, - timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) - .append(delimiter) + if (s.nonEmpty) { + val delimiter = complexDelimiters.get(level) + val builder = new StringBuilder() + s.foreach { x => + val nextLevel = level + 1 + builder.append(objectToString(x, serializationNullFormat, complexDelimiters, + timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) + .append(delimiter) + } + builder.substring(0, builder.length - delimiter.length()) + } else { + CarbonCommonConstants.SIZE_ZERO_DATA_RETURN } - builder.substring(0, builder.length - delimiter.length()) // First convert the 'key' of Map and then append the keyValueDelimiter and then convert // the 'value of the map and append delimiter case m: scala.collection.Map[_, _] => - val nextLevel = level + 2 - val delimiter = complexDelimiters.get(level) - val keyValueDelimiter = complexDelimiters.get(level + 1) - val builder = new StringBuilder() - m.foreach { x => - builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters, - timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) - .append(keyValueDelimiter) - builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters, - timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) - .append(delimiter) + if (m.nonEmpty) { + val nextLevel = level + 2 + val delimiter = complexDelimiters.get(level) + val keyValueDelimiter = complexDelimiters.get(level + 1) + val builder = new StringBuilder() + m.foreach { x => + builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters, + timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) + .append(keyValueDelimiter) + builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters, + timeStampFormat, dateFormat, isVarcharType, level = nextLevel)) + .append(delimiter) + } + builder.substring(0, builder.length - delimiter.length()) + } else { + CarbonCommonConstants.SIZE_ZERO_DATA_RETURN } - builder.substring(0, builder.length - delimiter.length()) case r: org.apache.spark.sql.Row => val delimiter = complexDelimiters.get(level) val builder = new StringBuilder()