Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862941
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update 
the row to match.
    +              val convertedRow =
    +                
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a 
standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We 
can't have both across
    +              // partitions, so we serialize and deserialize again to make 
it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    +              } else {
    +                convertedRow match {
    --- End diff --
    
    Yeah, the code is from Shark, and it is a little bit tricky. I think the 
logic here is:
    * We assumes the table object inspector is always a lazy objectinspector, 
and the deserializer always produces the lazy objects
    * We assumes the ObjectInspectorConverter always produces NON LAZY objects 
if the objectinspectors ARE NOT compatible.
    * If `convertedRow` is the lazy object, which means partition 
objectinspector is compatible with the table objectinspector(`convertedRow` can 
be retrieved directly), otherwise, the non lazy `convertedRow` is not 
acceptable by the table object inspector( table object inspector is the lazy 
object inspector), hence we need to convert it by serializing and 
de-serializing again.
    
    I don't think we need to maintain the logic here, as we can provide a 
better solution for the partition based table scanning. All we need to do is 
converting the `raw object` into `MutableRow` directly, as we did in 
`HiveTableScan` of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to