[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-16 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-49138529
  
@yhuai @concretevitamin @rxin I've create another PR for this follow up, we 
can discuss this more at:
https://github.com/apache/spark/pull/1439


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48930414
  
Thanks for reviewing this everyone.  I'm all for commenting and cleaning 
things up here, but if possible I'd like to merge this in today.  There are a 
couple of people blocking on this as its a pretty severe performance bug.  How 
about we just add some TODOs that can be taken care of in a follow up PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin closed the pull request at:

https://github.com/apache/spark/pull/1390


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48935494
  
@yhuai suggested a much simpler fix -- I benchmarked this and it gave the 
same performance boost. I am closing this and opening a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
GitHub user concretevitamin opened a pull request:

https://github.com/apache/spark/pull/1408

[SPARK-2443][SQL] Fix slow read from partitioned tables

This fix obtains a comparable performance boost as [PR 
#1390](https://github.com/apache/spark/pull/1390) by moving an array update and 
deserializer initialization out of a potentially very long loop. Suggested by 
@yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The 
data is loaded as a table through Hive. Results are obtained on my local 
machine using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/concretevitamin/spark slow-read-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1408


commit d86e437218f99179934ccd9b4d5d89c02b09459d
Author: Zongheng Yang zonghen...@gmail.com
Date:   2014-07-14T18:03:07Z

Move update  initialization out of potentially long loop.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48936743
  
New PR here: #1408 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48936856
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48937213
  
QA tests have started for PR 1408. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16631/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14894946
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
--- End diff --

I initially thought in a function context, `{ case x = ... }` will be 
optimized to `{ x = ... }`. I did a `scalac -print` on a simple program to 
confirm that this is not the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14898638
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
--- End diff --

oh really?  how does the generated bytecode differ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48951901
  
QA results for PR 1408:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16631/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48954454
  
Thanks!  I've merged this into both master and 1.0.

Are there other followup thing we want to fix from the discussion on the 
other PR?  or should I consider this done?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48954674
  
I think we should ask the users who reported the performance issue if this 
fix solves their problems. Otherwise the comments in the previous PR seem to 
only apply to that implementation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14902569
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
--- End diff --

https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c

It seems the with-case version does have more instructions to do.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread concretevitamin
Github user concretevitamin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14902570
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
--- End diff --

https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c

It seems the with-case version does have more instructions to do.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48978522
  
This will works in most of cases I think. But it may raise exceptions if 
the Table's Deserializer differs from the partition's Deserializer, since they 
may have different StructObjectInspector accordingly. 
I will create another PR to fix that soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-14 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/1408#issuecomment-48979538
  
@chenghao-intel Can you ping me after you create the PR or the JIRA? 
Thanks:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48832990
  
@yhuai can you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14856885
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
--- End diff --

is the pattern matching here necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48845188
  
I am reviewing it. Will comment it later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48859675
  
The code looks good to me. However, I think we can avoid the work around 
solution (de-serializing (with partition serde) and then serialize (with table 
serde) again) for adapting the higher level table scan (`TableScanOperator` in 
Shark), which have to providing a unique `ObjectInspector` for the downstream 
Operators.

Not like `TableScanOperator`, `HiveTableScan` in `Spark-Hive` doesn't reply 
on `ObjectInspector`, 
and its output type is `GenericMutableRow`, I think we could make the 
object conversion (from raw type to `Row` object) directly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48859842
  
And as the Hive SerDe actually provides the feature of `lazy` parsing, 
hence during the converting of `raw object` to `Row`, we need to support the 
column pruning

Sorry, some high level comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48860018
  
@chenghao-intel I am not sure I understand your comment on column pruning. 
I think for a Hive table, we should use `ColumnProjectionUtils` to set needed 
columns. So, RCFile and ORC can just read needed columns from HDFS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14862289
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
-// Map each tuple to a row object
-iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
-  val deserializedRow = deserializer.deserialize(value)
-  rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
-  rowWithPartArr.asInstanceOf[Object]
+
+val partSerDe = localDeserializer.newInstance()
+val tableSerDe = tableSerDeClass.newInstance()
+partSerDe.initialize(hconf, partProps)
+tableSerDe.initialize(hconf,  tableDesc.getProperties)
+
+val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
+  partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
+  .asInstanceOf[StructObjectInspector]
+val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
+  partSerDe.getObjectInspector, tblConvertedOI)
+
+// This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
+rowWithPartArr.update(1, partValues)
+
+// Map each tuple to a row object.
+if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
+  iter.map { case value =
+rowWithPartArr.update(0, partSerDe.deserialize(value))
+rowWithPartArr.asInstanceOf[Object]
+  }
+} else {
+  iter.map { case value =
+val deserializedRow = {
+  // If partition schema does not match table schema, update 
the row to match.
+  val convertedRow =
+
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
+
+  // If conversion was performed, convertedRow will be a 
standard Object, but if
+  // conversion wasn't necessary, it will still be lazy. We 
can't have both across
+  // partitions, so we serialize and deserialize again to make 
it lazy.
+  if (tableSerDe.isInstanceOf[OrcSerde]) {
+convertedRow
+  } else {
+convertedRow match {
+  case _: LazyStruct = convertedRow
+  case _: HiveColumnarStruct = convertedRow
+  case _ = tableSerDe.deserialize(
+
tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI))
--- End diff --

As mentioned by @chenghao-intel, can we avoid it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14862300
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
-// Map each tuple to a row object
-iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
-  val deserializedRow = deserializer.deserialize(value)
-  rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
-  rowWithPartArr.asInstanceOf[Object]
+
+val partSerDe = localDeserializer.newInstance()
+val tableSerDe = tableSerDeClass.newInstance()
+partSerDe.initialize(hconf, partProps)
+tableSerDe.initialize(hconf,  tableDesc.getProperties)
+
+val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
+  partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
+  .asInstanceOf[StructObjectInspector]
+val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
+  partSerDe.getObjectInspector, tblConvertedOI)
+
+// This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
+rowWithPartArr.update(1, partValues)
+
+// Map each tuple to a row object.
+if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
+  iter.map { case value =
+rowWithPartArr.update(0, partSerDe.deserialize(value))
+rowWithPartArr.asInstanceOf[Object]
+  }
+} else {
+  iter.map { case value =
+val deserializedRow = {
+  // If partition schema does not match table schema, update 
the row to match.
+  val convertedRow =
+
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
+
+  // If conversion was performed, convertedRow will be a 
standard Object, but if
+  // conversion wasn't necessary, it will still be lazy. We 
can't have both across
+  // partitions, so we serialize and deserialize again to make 
it lazy.
+  if (tableSerDe.isInstanceOf[OrcSerde]) {
+convertedRow
--- End diff --

Why do we need to take care ORC separately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14862338
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
-// Map each tuple to a row object
-iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
-  val deserializedRow = deserializer.deserialize(value)
-  rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
-  rowWithPartArr.asInstanceOf[Object]
+
+val partSerDe = localDeserializer.newInstance()
+val tableSerDe = tableSerDeClass.newInstance()
+partSerDe.initialize(hconf, partProps)
+tableSerDe.initialize(hconf,  tableDesc.getProperties)
+
+val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
+  partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
+  .asInstanceOf[StructObjectInspector]
+val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
+  partSerDe.getObjectInspector, tblConvertedOI)
+
+// This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
+rowWithPartArr.update(1, partValues)
+
+// Map each tuple to a row object.
+if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
+  iter.map { case value =
+rowWithPartArr.update(0, partSerDe.deserialize(value))
+rowWithPartArr.asInstanceOf[Object]
+  }
+} else {
+  iter.map { case value =
+val deserializedRow = {
+  // If partition schema does not match table schema, update 
the row to match.
+  val convertedRow =
+
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
+
+  // If conversion was performed, convertedRow will be a 
standard Object, but if
+  // conversion wasn't necessary, it will still be lazy. We 
can't have both across
+  // partitions, so we serialize and deserialize again to make 
it lazy.
+  if (tableSerDe.isInstanceOf[OrcSerde]) {
+convertedRow
+  } else {
+convertedRow match {
--- End diff --

I think we need to comment why we need to do this pattern matching. Also, 
why do we handle `LazyStruct` and `ColumnarStruct` specially? There are similar 
classes, e.g. `LazyBinaryStruct` and `LazyBinaryColumnarStruct`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1390#discussion_r14862941
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
 
   // Create local references so that the outer object isn't serialized.
   val tableDesc = _tableDesc
+  val tableSerDeClass = tableDesc.getDeserializerClass
+
   val broadcastedHiveConf = _broadcastedHiveConf
   val localDeserializer = partDeserializer
 
   val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
-  hivePartitionRDD.mapPartitions { iter =
+  hivePartitionRDD.mapPartitions { case iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
-// Map each tuple to a row object
-iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
-  val deserializedRow = deserializer.deserialize(value)
-  rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
-  rowWithPartArr.asInstanceOf[Object]
+
+val partSerDe = localDeserializer.newInstance()
+val tableSerDe = tableSerDeClass.newInstance()
+partSerDe.initialize(hconf, partProps)
+tableSerDe.initialize(hconf,  tableDesc.getProperties)
+
+val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
+  partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
+  .asInstanceOf[StructObjectInspector]
+val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
+  partSerDe.getObjectInspector, tblConvertedOI)
+
+// This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
+rowWithPartArr.update(1, partValues)
+
+// Map each tuple to a row object.
+if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
+  iter.map { case value =
+rowWithPartArr.update(0, partSerDe.deserialize(value))
+rowWithPartArr.asInstanceOf[Object]
+  }
+} else {
+  iter.map { case value =
+val deserializedRow = {
+  // If partition schema does not match table schema, update 
the row to match.
+  val convertedRow =
+
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
+
+  // If conversion was performed, convertedRow will be a 
standard Object, but if
+  // conversion wasn't necessary, it will still be lazy. We 
can't have both across
+  // partitions, so we serialize and deserialize again to make 
it lazy.
+  if (tableSerDe.isInstanceOf[OrcSerde]) {
+convertedRow
+  } else {
+convertedRow match {
--- End diff --

Yeah, the code is from Shark, and it is a little bit tricky. I think the 
logic here is:
* We assumes the table object inspector is always a lazy objectinspector, 
and the deserializer always produces the lazy objects
* We assumes the ObjectInspectorConverter always produces NON LAZY objects 
if the objectinspectors ARE NOT compatible.
* If `convertedRow` is the lazy object, which means partition 
objectinspector is compatible with the table objectinspector(`convertedRow` can 
be retrieved directly), otherwise, the non lazy `convertedRow` is not 
acceptable by the table object inspector( table object inspector is the lazy 
object inspector), hence we need to convert it by serializing and 
de-serializing again.

I don't think we need to maintain the logic here, as we can provide a 
better solution for the partition based table scanning. All we need to do is 
converting the `raw object` into `MutableRow` directly, as we did in 
`HiveTableScan` of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-12 Thread concretevitamin
Github user concretevitamin commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48830080
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48830138
  
QA tests have started for PR 1390. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

2014-07-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1390#issuecomment-48831466
  
QA results for PR 1390:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---