Github user chenghao-intel commented on a diff in the pull request:
https://github.com/apache/spark/pull/1390#discussion_r14862941
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc:
TableDesc, @transient sc: HiveCon
// Create local references so that the outer object isn't serialized.
val tableDesc = _tableDesc
+ val tableSerDeClass = tableDesc.getDeserializerClass
+
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
- hivePartitionRDD.mapPartitions { iter =>
+ hivePartitionRDD.mapPartitions { case iter =>
val hconf = broadcastedHiveConf.value.value
val rowWithPartArr = new Array[Object](2)
- // Map each tuple to a row object
- iter.map { value =>
- val deserializer = localDeserializer.newInstance()
- deserializer.initialize(hconf, partProps)
- val deserializedRow = deserializer.deserialize(value)
- rowWithPartArr.update(0, deserializedRow)
- rowWithPartArr.update(1, partValues)
- rowWithPartArr.asInstanceOf[Object]
+
+ val partSerDe = localDeserializer.newInstance()
+ val tableSerDe = tableSerDeClass.newInstance()
+ partSerDe.initialize(hconf, partProps)
+ tableSerDe.initialize(hconf, tableDesc.getProperties)
+
+ val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
+ partSerDe.getObjectInspector, tableSerDe.getObjectInspector,
true)
+ .asInstanceOf[StructObjectInspector]
+ val partTblObjectInspectorConverter =
ObjectInspectorConverters.getConverter(
+ partSerDe.getObjectInspector, tblConvertedOI)
+
+ // This is done per partition, and unnecessary to put it in the
iterations (in iter.map).
+ rowWithPartArr.update(1, partValues)
+
+ // Map each tuple to a row object.
+ if
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
+ iter.map { case value =>
+ rowWithPartArr.update(0, partSerDe.deserialize(value))
+ rowWithPartArr.asInstanceOf[Object]
+ }
+ } else {
+ iter.map { case value =>
+ val deserializedRow = {
+ // If partition schema does not match table schema, update
the row to match.
+ val convertedRow =
+
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
+
+ // If conversion was performed, convertedRow will be a
standard Object, but if
+ // conversion wasn't necessary, it will still be lazy. We
can't have both across
+ // partitions, so we serialize and deserialize again to make
it lazy.
+ if (tableSerDe.isInstanceOf[OrcSerde]) {
+ convertedRow
+ } else {
+ convertedRow match {
--- End diff --
Yeah, the code is from Shark, and it is a little bit tricky. I think the
logic here is:
* We assumes the table object inspector is always a lazy objectinspector,
and the deserializer always produces the lazy objects
* We assumes the ObjectInspectorConverter always produces NON LAZY objects
if the objectinspectors ARE NOT compatible.
* If `convertedRow` is the lazy object, which means partition
objectinspector is compatible with the table objectinspector(`convertedRow` can
be retrieved directly), otherwise, the non lazy `convertedRow` is not
acceptable by the table object inspector( table object inspector is the lazy
object inspector), hence we need to convert it by serializing and
de-serializing again.
I don't think we need to maintain the logic here, as we can provide a
better solution for the partition based table scanning. All we need to do is
converting the `raw object` into `MutableRow` directly, as we did in
`HiveTableScan` of Spark.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---