Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r143359416 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -797,26 +904,44 @@ case class HashAggregateExec( def updateRowInFastHashMap(isVectorized: Boolean): Option[String] = { - ctx.INPUT_ROW = fastRowBuffer + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localFastRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $fastRowBuffer.copy();" --- End diff -- I just passed the local variable as each function argument; ``` /* 329 */ // do aggregate /* 330 */ // copy aggregation row buffer to the local /* 331 */ InternalRow agg_localFastRowBuffer = agg_fastAggBuffer.copy(); /* 332 */ // common sub-expressions /* 333 */ boolean agg_isNull27 = false; /* 334 */ long agg_value30 = -1L; /* 335 */ if (!false) { /* 336 */ agg_value30 = (long) inputadapter_value; /* 337 */ } /* 338 */ // process aggregate functions to update aggregation buffer /* 339 */ agg_doAggregateVal_add2(inputadapter_value, agg_value30, agg_fastAggBuffer, agg_localFastRowBuffer, agg_isNull27); /* 340 */ agg_doAggregateVal_add3(inputadapter_value, agg_value30, agg_fastAggBuffer, agg_localFastRowBuffer, agg_isNull27); /* 341 */ agg_doAggregateVal_if1(inputadapter_value, agg_value30, agg_fastAggBuffer, agg_localFastRowBuffer, agg_isNull27); /* 342 */ ``` Since each split function directly updates an input row, we need to copy it to the local so that all the split functions can reference the old state.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org