Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11864083
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- 
part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += 
castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate 
has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) 
=>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: 
Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => 
mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar 
=> 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal 
=>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
    --- End diff --
    
    Why not just use a mapPartitions and work on one partition at a time? You 
don't want to collect all the data back to the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to