Github user esoroush commented on a diff in the pull request:
https://github.com/apache/spark/pull/456#discussion_r11863253
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
@@ -123,40 +159,88 @@ case class HiveTableScan(
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
+ /** mutable row implementation to avoid creating row instance at
+ * each iteration inside the while loop.
+ */
+ val row = new GenericMutableRow(attributes.length)
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
- val dataTypes = relation.partitionKeys.map(_.dataType)
- val castedValues = for ((value, dataType) <-
part.getValues.zip(dataTypes)) yield {
- castFromString(value, dataType)
+ val castedValues = mutable.ArrayBuffer[Any]()
+ var i = 0
+ var len = relation.partitionKeys.length
+ val iter: Iterator[String] = part.getValues.iterator
+ while (i < len) {
+ castedValues +=
castFromString(iter.next,relation.partitionKeys(i).dataType)
+ i += 1
}
-
// Only partitioned values are needed here, since the predicate
has already been bound to
// partition key attribute references.
- val row = new GenericRow(castedValues.toArray)
+ i = 0
+ len = castedValues.length
+ // castedValues represents columns in the row.
+ while (i < len) {
+ castedValues(i) match {
+ case n: String if n.toLowerCase == "null" => row.setNullAt(i)
+ case n: Boolean => row.setBoolean(i,n)
+ case n: Byte => row.setByte(i,n)
+ case n: Double => row.setDouble(i,n)
+ case n: Float => row.setFloat(i,n)
+ case n: Int => row.setInt(i,n)
+ case n: Long => row.setLong(i,n)
+ case n: String => row.setString(i,n)
+ case n: Short => row.setShort(i,n)
+ case other => row.update(i,other)
+ }
+ i += 1
+ }
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}
def execute() = {
- inputRdd.map { row =>
- val values = row match {
- case Array(deserializedRow: AnyRef, partitionKeys: Array[String])
=>
- attributeFunctions.map(_(deserializedRow, partitionKeys))
- case deserializedRow: AnyRef =>
- attributeFunctions.map(_(deserializedRow, Array.empty))
+ /**
+ * mutableRow is GenericMutableRow type and only created once.
+ * mutableRow is upadted at each iteration inside the while loop.
+ */
+ val mutableRow = new GenericMutableRow(attributes.length)
+ var i = 0
+
+ var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
+ /** rddBuffer keeps track of all the transformed rows.
+ * needed later to create finalRdd
+ */
+ val rddBuffer = mutable.ArrayBuffer[Row]()
+ while (iter.hasNext) {
+ val row = iter.next()
+ val values = row match {
+ case Array(deserializedRow: AnyRef, partitionKeys:
Array[String]) =>
+ attributeFunctions.map(_(deserializedRow, partitionKeys))
+ case deserializedRow: AnyRef =>
+ attributeFunctions.map(_(deserializedRow, Array.empty))
+ }
+ i = 0
+ val len = values.length
+ while ( i < len ) {
+ values(i) match {
+ case n: String if n.toLowerCase == "null" =>
mutableRow.setNullAt(i)
+ case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar
=>
+ mutableRow.update(i,varchar.getValue)
+ case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal
=>
+ mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
+ case other => mutableRow.update(i,other)
+ }
+ i += 1
+ }
+ rddBuffer += mutableRow
--- End diff --
Thanks Matei for the comments. I will address it today whenever I find a
time.
My major problem is the following. I appreciate the suggestions.
def execute() = {
/**
* mutableRow is GenericMutableRow type and only created once.
* mutableRow is upadted at each iteration inside the while loop.
*/
val mutableRow = new GenericMutableRow(attributes.length)
val res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
/** rddBuffer keeps track of all the transformed rows.
* needed later to create finalRdd
*/
val rddBuffer = mutable.ArrayBuffer[Row]()
while (iter.hasNext) {
val row = iter.next()
val values = row match {
case Array(deserializedRow: AnyRef, partitionKeys: Array[String])
=>
attributeFunctions.map(_(deserializedRow, partitionKeys))
case deserializedRow: AnyRef =>
attributeFunctions.map(_(deserializedRow, Array.empty))
}
var i = 0
val len = values.length
while ( i < len ) {
values(i) match {
case n: String if n.toLowerCase == "null" =>
mutableRow.setNullAt(i)
case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar
=>
mutableRow.update(i,varchar.getValue)
case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal
=>
mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
case other => mutableRow.update(i,other)
}
i += 1
}
rddBuffer += new GenericRow(mutableRow.toArray)
}
rddBuffer
})
/** finalRdd ... equivalent to Rdd generated from inputRdd.map(...) */
val concatArray = Array.concat[mutable.ArrayBuffer[Row]](res)
inputRdd.context.makeRDD(concatArray)
}
In the execute() method of hiveOperators.scala, I take an iterator over all
the partitions in the inputRDD and pass it to the runJob() method as argument.
The return from runJob() method is an Array[ArrayBuffer[Row]]. I need to
generate the output rdd from that and the only way I can think of is
concatenating all the ArrayBuffer[Row]s and generating one Array[Row] and then
generating a new finalRDD from that. I don't think this is the right way to do
that ( by doing concatenation, we are kind of localizing the already
partitioned data and then again partition it by creating a new RDD ) ... Does
anyone have a suggestion?
This is the main reason that I cannot pass the regression tests ...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---