Github user cloud-fan commented on the pull request:
https://github.com/apache/spark/pull/12067#issuecomment-206763439
generated code snippet in mutable projection codegen for complex buffer
type UDAF
```
object ComplexResultAgg extends Aggregator[(String, Int), (Long, Long),
(Long, Long)] {
override def zero: (Long, Long) = (0, 0)
override def reduce(countAndSum: (Long, Long), input: (String, Int)):
(Long, Long) = {
(countAndSum._1 + 1, countAndSum._2 + input._2)
}
override def merge(b1: (Long, Long), b2: (Long, Long)): (Long, Long) = {
(b1._1 + b2._1, b1._2 + b2._2)
}
override def finish(reduction: (Long, Long)): (Long, Long) = reduction
}
val ds = Seq("a" -> 1, "a" -> 3, "b" -> 3).toDS()
ds.groupByKey(_._1).agg(ComplexResultAgg.toColumn)
```
:
```
/* 033 */ /* bufferexpression(input[0, scala.Tuple2]._1 AS _1#29L,
input[0, scala.Tuple2]._2 AS _2#30L, org.apache.spark.sql.ComplexResultAgg... */
/* 034 */ /* [email protected] */
/* 035 */ /* org.apache.spark.sql.ComplexResultAgg$@3f33d0f3 */
/* 036 */ /* expression:
org.apache.spark.sql.ComplexResultAgg$@3f33d0f3 */
/* 037 */ Object obj = ((Expression) references[0]).eval(null);
/* 038 */ org.apache.spark.sql.expressions.Aggregator value2 =
(org.apache.spark.sql.expressions.Aggregator) obj;
/* 039 */ /* newInstance(class scala.Tuple2) */
/* 040 */ /* input[0, struct<_1:bigint,_2:bigint>]._1 */
/* 041 */ /* input[0, struct<_1:bigint,_2:bigint>] */
/* 042 */ InternalRow value5 = i.getStruct(0, 2);
/* 043 */ long value4 = -1L;
/* 044 */
/* 045 */ value4 = value5.getLong(0);
/* 046 */ /* input[0, struct<_1:bigint,_2:bigint>]._2 */
/* 047 */ /* input[0, struct<_1:bigint,_2:bigint>] */
/* 048 */ InternalRow value7 = i.getStruct(0, 2);
/* 049 */ long value6 = -1L;
/* 050 */
/* 051 */ value6 = value7.getLong(1);
/* 052 */
/* 053 */
/* 054 */
/* 055 */ final scala.Tuple2 value3 = new scala.Tuple2(value4, value6);
/* 056 */ final boolean isNull3 = false;
/* 057 */ /* newInstance(class scala.Tuple2) */
/* 058 */ /* input[2, struct<_1:bigint,_2:bigint>]._1 */
/* 059 */ /* input[2, struct<_1:bigint,_2:bigint>] */
/* 060 */ InternalRow value10 = i.getStruct(2, 2);
/* 061 */ long value9 = -1L;
/* 062 */
/* 063 */ value9 = value10.getLong(0);
/* 064 */ /* input[2, struct<_1:bigint,_2:bigint>]._2 */
/* 065 */ /* input[2, struct<_1:bigint,_2:bigint>] */
/* 066 */ InternalRow value12 = i.getStruct(2, 2);
/* 067 */ long value11 = -1L;
/* 068 */
/* 069 */ value11 = value12.getLong(1);
/* 070 */
/* 071 */
/* 072 */
/* 073 */ final scala.Tuple2 value8 = new scala.Tuple2(value9, value11);
/* 074 */ final boolean isNull8 = false;
/* 075 */ scala.Tuple2 value1 = false ? null : (scala.Tuple2)
value2.merge(value3, value8);
/* 076 */ boolean isNull1 = value1 == null;
/* 077 */ /* struct(lambdavariable(value1, isNull1, ObjectType(class
scala.Tuple2))._1 AS _1#29L, lambdavariable(value1, isNull1, ObjectType(... */
/* 078 */ boolean isNull13 = false;
/* 079 */ final Object[] values = new Object[2];
/* 080 */ /* lambdavariable(value1, isNull1, ObjectType(class
scala.Tuple2))._1 */
/* 081 */ long value14 = isNull1 ? -1L : (long)
((java.lang.Long)value1._1()).longValue();
/* 082 */ if (isNull1) {
/* 083 */ values[0] = null;
/* 084 */ } else {
/* 085 */ values[0] = value14;
/* 086 */ }
/* 087 */
/* 088 */ /* lambdavariable(value1, isNull1, ObjectType(class
scala.Tuple2))._2 */
/* 089 */ long value15 = isNull1 ? -1L : (long)
((java.lang.Long)value1._2()).longValue();
/* 090 */ if (isNull1) {
/* 091 */ values[1] = null;
/* 092 */ } else {
/* 093 */ values[1] = value15;
/* 094 */ }
/* 095 */ final InternalRow value13 = new
org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values);
/* 096 */ this.value_0 = value13;
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]