This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 00f64c6 [CARBONDATA-3565] Fix complex binary data broken issue when loading dataframe data 00f64c6 is described below commit 00f64c6f32710a0e2beddfdb6403b2faa879e031 Author: IceMimosa <chk19940...@gmail.com> AuthorDate: Tue Jan 7 13:24:57 2020 +0800 [CARBONDATA-3565] Fix complex binary data broken issue when loading dataframe data Why is this PR needed? When binary data is DataOutputStream#writeDouble and so on. Spark DataFrame(SQL) load it to a table, the data will be broken (EF BF BD) when reading out. What changes were proposed in this PR? If data is byte[], no need to convert to string and decode to byte[] again Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3430 --- .../spark/rdd/NewCarbonDataLoadRDD.scala | 34 ++++++++++++++++------ .../testsuite/binary/TestBinaryDataType.scala | 34 ++++++++++++++++++++++ 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 23a2683..7caf644 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.{CollectionAccumulator, SparkUtil} import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo @@ -351,7 +351,7 @@ class NewDataFrameLoaderRDD[K, V]( /** * This class wrap Scala's Iterator to Java's Iterator. - * It also convert all columns to string data to use csv data loading flow. + * It also convert all columns to string data (exclude binary type) to use csv data loading flow. * * @param rddIter * @param carbonLoadModel @@ -378,6 +378,9 @@ class NewRddIterator(rddIter: Iterator[Row], private val isComplexTypeMapping = carbonLoadModel.getCarbonDataLoadSchema .getCarbonTable.getCreateOrderColumn.asScala.map(_.isComplex()) + private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder == null || + CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals( + carbonLoadModel.getBinaryDecoder) def hasNext: Boolean = rddIter.hasNext def next: Array[AnyRef] = { @@ -386,10 +389,15 @@ class NewRddIterator(rddIter: Iterator[Row], val len = columns.length var i = 0 while (i < len) { - columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel, serializationNullFormat, - complexDelimiters, timeStampFormat, dateFormat, - isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i), - isComplexType = i < isComplexTypeMapping.size && isComplexTypeMapping(i)) + columns(i) = row.get(i) match { + case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder => + bs.asInstanceOf[Array[Byte]] + case _ => + CarbonScalaUtil.getString(row, i, carbonLoadModel, serializationNullFormat, + complexDelimiters, timeStampFormat, dateFormat, + isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i), + isComplexType = i < isComplexTypeMapping.size && isComplexTypeMapping(i)) + } i += 1 } columns @@ -438,6 +446,9 @@ class LazyRddIterator(serializer: SerializerInstance, r.isDefined && r.get }) } + private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder == null || + CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals( + carbonLoadModel.getBinaryDecoder) private var rddIter: Iterator[Row] = null private var uninitialized = true @@ -460,9 +471,14 @@ class LazyRddIterator(serializer: SerializerInstance, val row = rddIter.next() val columns = new Array[AnyRef](row.length) for (i <- 0 until columns.length) { - columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel, serializationNullFormat, - complexDelimiters, timeStampFormat, dateFormat, - isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i)) + columns(i) = row.get(i) match { + case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder => + bs.asInstanceOf[Array[Byte]] + case _ => + CarbonScalaUtil.getString(row, i, carbonLoadModel, serializationNullFormat, + complexDelimiters, timeStampFormat, dateFormat, + isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i)) + } } columns } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala index 4cd2b66..ae0eea8 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.integration.spark.testsuite.binary +import java.io.{ByteArrayOutputStream, DataOutputStream} import java.util.Arrays import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -1665,6 +1666,39 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE binaryTable") } + test("test complex binary insert into table") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true") + import sqlContext.implicits._ + sql("DROP TABLE IF EXISTS binaryTable") + sql("DROP TABLE IF EXISTS binaryTable_carbondata") + sql("DROP TABLE IF EXISTS binaryTable_carbon") + sql(s"""CREATE TABLE IF NOT EXISTS binaryTable( binaryField binary ) STORED AS carbondata""") + sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbondata( binaryField binary ) USING CARBONDATA""") + sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbon( binaryField binary ) USING CARBON""") + // create binary data + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + dos.writeInt(123) + dos.writeChars("abc") + dos.writeDouble(0.998123D) + dos.writeChars("def") + val bytes = baos.toByteArray + + Seq(bytes).toDF("binaryField").write.insertInto("binaryTable") + Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbondata") + Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbon") + checkAnswer(sql("SELECT * FROM binaryTable"), Seq(Row(bytes))) + checkAnswer(sql("SELECT * FROM binaryTable_carbondata"), Seq(Row(bytes))) + checkAnswer(sql("SELECT * FROM binaryTable_carbon"), Seq(Row(bytes))) + sql("DROP TABLE binaryTable") + sql("DROP TABLE binaryTable_carbondata") + sql("DROP TABLE binaryTable_carbon") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, + CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT) + } + override def afterAll: Unit = { sqlContext.sparkSession.conf.unset("hive.exec.dynamic.partition.mode") sql("DROP TABLE IF EXISTS binaryTable")