This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f64c6  [CARBONDATA-3565] Fix complex binary data broken issue when 
loading dataframe data
00f64c6 is described below

commit 00f64c6f32710a0e2beddfdb6403b2faa879e031
Author: IceMimosa <chk19940...@gmail.com>
AuthorDate: Tue Jan 7 13:24:57 2020 +0800

    [CARBONDATA-3565] Fix complex binary data broken issue when loading 
dataframe data
    
    Why is this PR needed?
    When binary data is DataOutputStream#writeDouble and so on.
    Spark DataFrame(SQL) load it to a table, the data will be broken (EF BF BD) 
when reading out.
    
    What changes were proposed in this PR?
    If data is byte[], no need to convert to string and decode to byte[]
    again
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3430
---
 .../spark/rdd/NewCarbonDataLoadRDD.scala           | 34 ++++++++++++++++------
 .../testsuite/binary/TestBinaryDataType.scala      | 34 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 23a2683..7caf644 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.{CollectionAccumulator, 
SparkUtil}
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
@@ -351,7 +351,7 @@ class NewDataFrameLoaderRDD[K, V](
 
 /**
  * This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data to use csv data loading flow.
+ * It also convert all columns to string data (exclude binary type) to use csv 
data loading flow.
  *
  * @param rddIter
  * @param carbonLoadModel
@@ -378,6 +378,9 @@ class NewRddIterator(rddIter: Iterator[Row],
   private val isComplexTypeMapping =
     carbonLoadModel.getCarbonDataLoadSchema
       .getCarbonTable.getCreateOrderColumn.asScala.map(_.isComplex())
+  private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder == 
null ||
+    CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals(
+      carbonLoadModel.getBinaryDecoder)
   def hasNext: Boolean = rddIter.hasNext
 
   def next: Array[AnyRef] = {
@@ -386,10 +389,15 @@ class NewRddIterator(rddIter: Iterator[Row],
     val len = columns.length
     var i = 0
     while (i < len) {
-      columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel, 
serializationNullFormat,
-        complexDelimiters, timeStampFormat, dateFormat,
-        isVarcharType = i < isVarcharTypeMapping.size && 
isVarcharTypeMapping(i),
-        isComplexType = i < isComplexTypeMapping.size && 
isComplexTypeMapping(i))
+      columns(i) = row.get(i) match {
+        case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder =>
+          bs.asInstanceOf[Array[Byte]]
+        case _ =>
+          CarbonScalaUtil.getString(row, i, carbonLoadModel, 
serializationNullFormat,
+            complexDelimiters, timeStampFormat, dateFormat,
+            isVarcharType = i < isVarcharTypeMapping.size && 
isVarcharTypeMapping(i),
+            isComplexType = i < isComplexTypeMapping.size && 
isComplexTypeMapping(i))
+      }
       i += 1
     }
     columns
@@ -438,6 +446,9 @@ class LazyRddIterator(serializer: SerializerInstance,
       r.isDefined && r.get
     })
   }
+  private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder == 
null ||
+    CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals(
+      carbonLoadModel.getBinaryDecoder)
 
   private var rddIter: Iterator[Row] = null
   private var uninitialized = true
@@ -460,9 +471,14 @@ class LazyRddIterator(serializer: SerializerInstance,
     val row = rddIter.next()
     val columns = new Array[AnyRef](row.length)
     for (i <- 0 until columns.length) {
-      columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel, 
serializationNullFormat,
-        complexDelimiters, timeStampFormat, dateFormat,
-        isVarcharType = i < isVarcharTypeMapping.size && 
isVarcharTypeMapping(i))
+      columns(i) = row.get(i) match {
+        case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder =>
+          bs.asInstanceOf[Array[Byte]]
+        case _ =>
+          CarbonScalaUtil.getString(row, i, carbonLoadModel, 
serializationNullFormat,
+            complexDelimiters, timeStampFormat, dateFormat,
+            isVarcharType = i < isVarcharTypeMapping.size && 
isVarcharTypeMapping(i))
+      }
     }
     columns
   }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 4cd2b66..ae0eea8 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.binary
 
+import java.io.{ByteArrayOutputStream, DataOutputStream}
 import java.util.Arrays
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -1665,6 +1666,39 @@ class TestBinaryDataType extends QueryTest with 
BeforeAndAfterAll {
         sql("DROP TABLE binaryTable")
     }
 
+    test("test complex binary insert into table") {
+        CarbonProperties.getInstance()
+            
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
+        import sqlContext.implicits._
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql("DROP TABLE IF EXISTS binaryTable_carbondata")
+        sql("DROP TABLE IF EXISTS binaryTable_carbon")
+        sql(s"""CREATE TABLE IF NOT EXISTS binaryTable( binaryField binary ) 
STORED AS carbondata""")
+        sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbondata( binaryField 
binary ) USING CARBONDATA""")
+        sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbon( binaryField 
binary ) USING CARBON""")
+        // create binary data
+        val baos = new ByteArrayOutputStream()
+        val dos = new DataOutputStream(baos)
+        dos.writeInt(123)
+        dos.writeChars("abc")
+        dos.writeDouble(0.998123D)
+        dos.writeChars("def")
+        val bytes = baos.toByteArray
+
+        Seq(bytes).toDF("binaryField").write.insertInto("binaryTable")
+        
Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbondata")
+        Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbon")
+        checkAnswer(sql("SELECT * FROM binaryTable"), Seq(Row(bytes)))
+        checkAnswer(sql("SELECT * FROM binaryTable_carbondata"), 
Seq(Row(bytes)))
+        checkAnswer(sql("SELECT * FROM binaryTable_carbon"), Seq(Row(bytes)))
+        sql("DROP TABLE binaryTable")
+        sql("DROP TABLE binaryTable_carbondata")
+        sql("DROP TABLE binaryTable_carbon")
+        CarbonProperties.getInstance()
+            
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+                
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
+    }
+
     override def afterAll: Unit = {
         sqlContext.sparkSession.conf.unset("hive.exec.dynamic.partition.mode")
         sql("DROP TABLE IF EXISTS binaryTable")

Reply via email to