Asif Hussain Shahid created SPARK-13116: -------------------------------------------
Summary: TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows Key: SPARK-13116 URL: https://issues.apache.org/jira/browse/SPARK-13116 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Reporter: Asif Hussain Shahid TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows. If the input to TungstenAggregateIterator is a SafeRow, while the target is an UnsafeRow , the current code will try to set the fields in the UnsafeRow using the update method in UnSafeRow. This method is called via TunsgtenAggregateIterator on the InterpretedMutableProjection. The target row in the InterpretedMutableProjection is an UnsafeRow, while the current row is a SafeRow. In the InterpretedMutableProjection's apply method, it invokes mutableRow(i) = exprArray(i).eval(input) Now for UnsafeRow, the update method throws UnsupportedOperationException. The proposed fix I did for our forked branch , on the class InterpretedProjection is: + private var targetUnsafe = false + type UnsafeSetter = (UnsafeRow, Any ) => Unit + private var setters : Array[UnsafeSetter] = _ private[this] val exprArray = expressions.toArray private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length) def currentValue: InternalRow = mutableRow + override def target(row: MutableRow): MutableProjection = { mutableRow = row + targetUnsafe = row match { + case _:UnsafeRow =>{ + if(setters == null) { + setters = Array.ofDim[UnsafeSetter](exprArray.length) + for(i <- 0 until exprArray.length) { + setters(i) = exprArray(i).dataType match { + case IntegerType => (target: UnsafeRow, value: Any ) => + target.setInt(i,value.asInstanceOf[Int]) + case LongType => (target: UnsafeRow, value: Any ) => + target.setLong(i,value.asInstanceOf[Long]) + case DoubleType => (target: UnsafeRow, value: Any ) => + target.setDouble(i,value.asInstanceOf[Double]) + case FloatType => (target: UnsafeRow, value: Any ) => + target.setFloat(i,value.asInstanceOf[Float]) + + case NullType => (target: UnsafeRow, value: Any ) => + target.setNullAt(i) + + case BooleanType => (target: UnsafeRow, value: Any ) => + target.setBoolean(i,value.asInstanceOf[Boolean]) + + case ByteType => (target: UnsafeRow, value: Any ) => + target.setByte(i,value.asInstanceOf[Byte]) + case ShortType => (target: UnsafeRow, value: Any ) => + target.setShort(i,value.asInstanceOf[Short]) + + } + } + } + true + } + case _ => false + } + this } override def apply(input: InternalRow): InternalRow = { var i = 0 while (i < exprArray.length) { - mutableRow(i) = exprArray(i).eval(input) + if(targetUnsafe) { + setters(i)(mutableRow.asInstanceOf[UnsafeRow], exprArray(i).eval(input)) + }else { + mutableRow(i) = exprArray(i).eval(input) + } i += 1 } -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org