Asif Hussain Shahid created SPARK-13116:
-------------------------------------------

             Summary: TungstenAggregate though it is supposedly capable of all 
processing unsafe & safe rows, fails if the input is safe rows
                 Key: SPARK-13116
                 URL: https://issues.apache.org/jira/browse/SPARK-13116
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.0
            Reporter: Asif Hussain Shahid


TungstenAggregate though it is supposedly capable of all processing unsafe & 
safe rows, fails if the input is safe rows.

If the input to TungstenAggregateIterator is a SafeRow, while the target is an 
UnsafeRow ,  the current code will try to set the fields in the UnsafeRow using 
the update method in UnSafeRow. 
This method is called via TunsgtenAggregateIterator on the 
InterpretedMutableProjection. The target row in the 
InterpretedMutableProjection is an UnsafeRow, while the current row is a 
SafeRow.
In the InterpretedMutableProjection's apply method, it invokes
 mutableRow(i) = exprArray(i).eval(input)
Now for UnsafeRow, the update method throws UnsupportedOperationException.

The proposed fix I did for our forked branch , on the class 
InterpretedProjection is:

+  private var targetUnsafe = false
 +  type UnsafeSetter = (UnsafeRow,  Any ) => Unit
 +  private var setters : Array[UnsafeSetter] = _
    private[this] val exprArray = expressions.toArray
    private[this] var mutableRow: MutableRow = new 
GenericMutableRow(exprArray.length)
    def currentValue: InternalRow = mutableRow
  
 +
    override def target(row: MutableRow): MutableProjection = {
      mutableRow = row
 +    targetUnsafe = row match {
 +      case _:UnsafeRow =>{
 +        if(setters == null) {
 +          setters = Array.ofDim[UnsafeSetter](exprArray.length)
 +          for(i <- 0 until exprArray.length) {
 +            setters(i) = exprArray(i).dataType match {
 +              case IntegerType => (target: UnsafeRow,  value: Any ) =>
 +                target.setInt(i,value.asInstanceOf[Int])
 +              case LongType => (target: UnsafeRow,  value: Any ) =>
 +                target.setLong(i,value.asInstanceOf[Long])
 +              case DoubleType => (target: UnsafeRow,  value: Any ) =>
 +                target.setDouble(i,value.asInstanceOf[Double])
 +              case FloatType => (target: UnsafeRow, value: Any ) =>
 +                target.setFloat(i,value.asInstanceOf[Float])
 +
 +              case NullType => (target: UnsafeRow,  value: Any ) =>
 +                target.setNullAt(i)
 +
 +              case BooleanType => (target: UnsafeRow,  value: Any ) =>
 +                target.setBoolean(i,value.asInstanceOf[Boolean])
 +
 +              case ByteType => (target: UnsafeRow,  value: Any ) =>
 +                target.setByte(i,value.asInstanceOf[Byte])
 +              case ShortType => (target: UnsafeRow, value: Any ) =>
 +                target.setShort(i,value.asInstanceOf[Short])
 +
 +            }
 +          }
 +        }
 +        true
 +      }
 +      case _ => false
 +    }
 +
      this
    }
  
    override def apply(input: InternalRow): InternalRow = {
      var i = 0
      while (i < exprArray.length) {
 -      mutableRow(i) = exprArray(i).eval(input)
 +      if(targetUnsafe) {
 +        setters(i)(mutableRow.asInstanceOf[UnsafeRow], 
exprArray(i).eval(input))
 +      }else {
 +        mutableRow(i) = exprArray(i).eval(input)
 +      }
        i += 1
      }





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to