Github user yucai commented on the issue:
https://github.com/apache/spark/pull/14481
Generated code example, **not for code review yet**
```
scala> Seq(("a", "3"), ("b", "20"), ("b", "2")).toDF("k",
"v").agg(max("v")).debugCodegen()
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*SortAggregate(key=[], functions=[partial_max(v#6)], output=[max#18])
+- LocalTableScan [v#6]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean sagg_initAgg;
/* 008 */ private boolean sagg_bufIsNull;
/* 009 */ private UTF8String sagg_bufValue;
/* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric
sagg_numOutputRows;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric
sagg_aggTime;
/* 012 */ private scala.collection.Iterator inputadapter_input;
/* 013 */ private UnsafeRow sagg_result;
/* 014 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sagg_holder;
/* 015 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
sagg_rowWriter;
/* 016 */
/* 017 */ public GeneratedIterator(Object[] references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.collection.Iterator inputs[])
{
/* 022 */ partitionIndex = index;
/* 023 */ sagg_initAgg = false;
/* 024 */
/* 025 */ this.sagg_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 026 */ this.sagg_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 027 */ inputadapter_input = inputs[0];
/* 028 */ sagg_result = new UnsafeRow(1);
/* 029 */ this.sagg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sagg_result, 32);
/* 030 */ this.sagg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sagg_holder,
1);
/* 031 */ }
/* 032 */
/* 033 */ private void sagg_doAggregateWithoutKey() throws
java.io.IOException {
/* 034 */ // initialize aggregation buffer
/* 035 */ final UTF8String sagg_value = null;
/* 036 */ sagg_bufIsNull = true;
/* 037 */ sagg_bufValue = sagg_value;
/* 038 */
/* 039 */ while (inputadapter_input.hasNext()) {
/* 040 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 041 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 042 */ UTF8String inputadapter_value = inputadapter_isNull ? null
: (inputadapter_row.getUTF8String(0));
/* 043 */
/* 044 */ // common sub-expressions
/* 045 */
/* 046 */ // evaluate aggregate function
/* 047 */ boolean sagg_isNull1 = sagg_bufIsNull;
/* 048 */ UTF8String sagg_value1 = sagg_bufValue;
/* 049 */
/* 050 */ if (!inputadapter_isNull && (sagg_isNull1 ||
/* 051 */ (inputadapter_value.compare(sagg_value1)) > 0)) {
/* 052 */ sagg_isNull1 = false;
/* 053 */ sagg_value1 = inputadapter_value;
/* 054 */ }
/* 055 */ // update aggregation buffer
/* 056 */ sagg_bufIsNull = sagg_isNull1;
/* 057 */ if (sagg_bufValue != sagg_value1)
/* 058 */ sagg_bufValue = sagg_value1 == null ? null :
sagg_value1.clone();
/* 059 */ if (shouldStop()) return;
/* 060 */ }
/* 061 */
/* 062 */ }
/* 063 */
/* 064 */ protected void processNext() throws java.io.IOException {
/* 065 */ while (!sagg_initAgg) {
/* 066 */ sagg_initAgg = true;
/* 067 */ long sagg_beforeAgg = System.nanoTime();
/* 068 */ sagg_doAggregateWithoutKey();
/* 069 */ sagg_aggTime.add((System.nanoTime() - sagg_beforeAgg) /
1000000);
/* 070 */
/* 071 */ // output the result
/* 072 */
/* 073 */ sagg_numOutputRows.add(1);
/* 074 */ sagg_holder.reset();
/* 075 */
/* 076 */ sagg_rowWriter.zeroOutNullBytes();
/* 077 */
/* 078 */ if (sagg_bufIsNull) {
/* 079 */ sagg_rowWriter.setNullAt(0);
/* 080 */ } else {
/* 081 */ sagg_rowWriter.write(0, sagg_bufValue);
/* 082 */ }
/* 083 */ sagg_result.setTotalSize(sagg_holder.totalSize());
/* 084 */ append(sagg_result);
/* 085 */ }
/* 086 */ }
/* 087 */ }
== Subtree 2 / 2 ==
*SortAggregate(key=[], functions=[max(v#6)], output=[max(v)#14])
+- Exchange SinglePartition
+- *SortAggregate(key=[], functions=[partial_max(v#6)], output=[max#18])
+- LocalTableScan [v#6]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean sagg_initAgg;
/* 008 */ private boolean sagg_bufIsNull;
/* 009 */ private UTF8String sagg_bufValue;
/* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric
sagg_numOutputRows;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric
sagg_aggTime;
/* 012 */ private scala.collection.Iterator inputadapter_input;
/* 013 */ private UnsafeRow sagg_result;
/* 014 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sagg_holder;
/* 015 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
sagg_rowWriter;
/* 016 */
/* 017 */ public GeneratedIterator(Object[] references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.collection.Iterator inputs[])
{
/* 022 */ partitionIndex = index;
/* 023 */ sagg_initAgg = false;
/* 024 */
/* 025 */ this.sagg_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 026 */ this.sagg_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 027 */ inputadapter_input = inputs[0];
/* 028 */ sagg_result = new UnsafeRow(1);
/* 029 */ this.sagg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sagg_result, 32);
/* 030 */ this.sagg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sagg_holder,
1);
/* 031 */ }
/* 032 */
/* 033 */ private void sagg_doAggregateWithoutKey() throws
java.io.IOException {
/* 034 */ // initialize aggregation buffer
/* 035 */ final UTF8String sagg_value = null;
/* 036 */ sagg_bufIsNull = true;
/* 037 */ sagg_bufValue = sagg_value;
/* 038 */
/* 039 */ while (inputadapter_input.hasNext()) {
/* 040 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 041 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 042 */ UTF8String inputadapter_value = inputadapter_isNull ? null
: (inputadapter_row.getUTF8String(0));
/* 043 */
/* 044 */ // common sub-expressions
/* 045 */
/* 046 */ // evaluate aggregate function
/* 047 */ boolean sagg_isNull3 = sagg_bufIsNull;
/* 048 */ UTF8String sagg_value3 = sagg_bufValue;
/* 049 */
/* 050 */ if (!inputadapter_isNull && (sagg_isNull3 ||
/* 051 */ (inputadapter_value.compare(sagg_value3)) > 0)) {
/* 052 */ sagg_isNull3 = false;
/* 053 */ sagg_value3 = inputadapter_value;
/* 054 */ }
/* 055 */ // update aggregation buffer
/* 056 */ sagg_bufIsNull = sagg_isNull3;
/* 057 */ if (sagg_bufValue != sagg_value3)
/* 058 */ sagg_bufValue = sagg_value3 == null ? null :
sagg_value3.clone();
/* 059 */ if (shouldStop()) return;
/* 060 */ }
/* 061 */
/* 062 */ }
/* 063 */
/* 064 */ protected void processNext() throws java.io.IOException {
/* 065 */ while (!sagg_initAgg) {
/* 066 */ sagg_initAgg = true;
/* 067 */ long sagg_beforeAgg = System.nanoTime();
/* 068 */ sagg_doAggregateWithoutKey();
/* 069 */ sagg_aggTime.add((System.nanoTime() - sagg_beforeAgg) /
1000000);
/* 070 */
/* 071 */ // output the result
/* 072 */
/* 073 */ sagg_numOutputRows.add(1);
/* 074 */ sagg_holder.reset();
/* 075 */
/* 076 */ sagg_rowWriter.zeroOutNullBytes();
/* 077 */
/* 078 */ if (sagg_bufIsNull) {
/* 079 */ sagg_rowWriter.setNullAt(0);
/* 080 */ } else {
/* 081 */ sagg_rowWriter.write(0, sagg_bufValue);
/* 082 */ }
/* 083 */ sagg_result.setTotalSize(sagg_holder.totalSize());
/* 084 */ append(sagg_result);
/* 085 */ }
/* 086 */ }
/* 087 */ }
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]