Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/14481
  
    Generated code example, **not for code review yet**
    ```
    scala> Seq(("a", "3"), ("b", "20"), ("b", "2")).toDF("k", 
"v").agg(max("v")).debugCodegen()
    
    Found 2 WholeStageCodegen subtrees.
    == Subtree 1 / 2 ==
    *SortAggregate(key=[], functions=[partial_max(v#6)], output=[max#18])
    +- LocalTableScan [v#6]
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private boolean sagg_initAgg;
    /* 008 */   private boolean sagg_bufIsNull;
    /* 009 */   private UTF8String sagg_bufValue;
    /* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sagg_numOutputRows;
    /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sagg_aggTime;
    /* 012 */   private scala.collection.Iterator inputadapter_input;
    /* 013 */   private UnsafeRow sagg_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sagg_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
sagg_rowWriter;
    /* 016 */
    /* 017 */   public GeneratedIterator(Object[] references) {
    /* 018 */     this.references = references;
    /* 019 */   }
    /* 020 */
    /* 021 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 022 */     partitionIndex = index;
    /* 023 */     sagg_initAgg = false;
    /* 024 */
    /* 025 */     this.sagg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 026 */     this.sagg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     sagg_result = new UnsafeRow(1);
    /* 029 */     this.sagg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sagg_result, 32);
    /* 030 */     this.sagg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sagg_holder, 
1);
    /* 031 */   }
    /* 032 */
    /* 033 */   private void sagg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 034 */     // initialize aggregation buffer
    /* 035 */     final UTF8String sagg_value = null;
    /* 036 */     sagg_bufIsNull = true;
    /* 037 */     sagg_bufValue = sagg_value;
    /* 038 */
    /* 039 */     while (inputadapter_input.hasNext()) {
    /* 040 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 041 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 042 */       UTF8String inputadapter_value = inputadapter_isNull ? null 
: (inputadapter_row.getUTF8String(0));
    /* 043 */
    /* 044 */       // common sub-expressions
    /* 045 */
    /* 046 */       // evaluate aggregate function
    /* 047 */       boolean sagg_isNull1 = sagg_bufIsNull;
    /* 048 */       UTF8String sagg_value1 = sagg_bufValue;
    /* 049 */
    /* 050 */       if (!inputadapter_isNull && (sagg_isNull1 ||
    /* 051 */           (inputadapter_value.compare(sagg_value1)) > 0)) {
    /* 052 */         sagg_isNull1 = false;
    /* 053 */         sagg_value1 = inputadapter_value;
    /* 054 */       }
    /* 055 */       // update aggregation buffer
    /* 056 */       sagg_bufIsNull = sagg_isNull1;
    /* 057 */       if (sagg_bufValue != sagg_value1)
    /* 058 */       sagg_bufValue = sagg_value1 == null ? null : 
sagg_value1.clone();
    /* 059 */       if (shouldStop()) return;
    /* 060 */     }
    /* 061 */
    /* 062 */   }
    /* 063 */
    /* 064 */   protected void processNext() throws java.io.IOException {
    /* 065 */     while (!sagg_initAgg) {
    /* 066 */       sagg_initAgg = true;
    /* 067 */       long sagg_beforeAgg = System.nanoTime();
    /* 068 */       sagg_doAggregateWithoutKey();
    /* 069 */       sagg_aggTime.add((System.nanoTime() - sagg_beforeAgg) / 
1000000);
    /* 070 */
    /* 071 */       // output the result
    /* 072 */
    /* 073 */       sagg_numOutputRows.add(1);
    /* 074 */       sagg_holder.reset();
    /* 075 */
    /* 076 */       sagg_rowWriter.zeroOutNullBytes();
    /* 077 */
    /* 078 */       if (sagg_bufIsNull) {
    /* 079 */         sagg_rowWriter.setNullAt(0);
    /* 080 */       } else {
    /* 081 */         sagg_rowWriter.write(0, sagg_bufValue);
    /* 082 */       }
    /* 083 */       sagg_result.setTotalSize(sagg_holder.totalSize());
    /* 084 */       append(sagg_result);
    /* 085 */     }
    /* 086 */   }
    /* 087 */ }
    
    == Subtree 2 / 2 ==
    *SortAggregate(key=[], functions=[max(v#6)], output=[max(v)#14])
    +- Exchange SinglePartition
       +- *SortAggregate(key=[], functions=[partial_max(v#6)], output=[max#18])
          +- LocalTableScan [v#6]
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private boolean sagg_initAgg;
    /* 008 */   private boolean sagg_bufIsNull;
    /* 009 */   private UTF8String sagg_bufValue;
    /* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sagg_numOutputRows;
    /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sagg_aggTime;
    /* 012 */   private scala.collection.Iterator inputadapter_input;
    /* 013 */   private UnsafeRow sagg_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sagg_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
sagg_rowWriter;
    /* 016 */
    /* 017 */   public GeneratedIterator(Object[] references) {
    /* 018 */     this.references = references;
    /* 019 */   }
    /* 020 */
    /* 021 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 022 */     partitionIndex = index;
    /* 023 */     sagg_initAgg = false;
    /* 024 */
    /* 025 */     this.sagg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 026 */     this.sagg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     sagg_result = new UnsafeRow(1);
    /* 029 */     this.sagg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sagg_result, 32);
    /* 030 */     this.sagg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sagg_holder, 
1);
    /* 031 */   }
    /* 032 */
    /* 033 */   private void sagg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 034 */     // initialize aggregation buffer
    /* 035 */     final UTF8String sagg_value = null;
    /* 036 */     sagg_bufIsNull = true;
    /* 037 */     sagg_bufValue = sagg_value;
    /* 038 */
    /* 039 */     while (inputadapter_input.hasNext()) {
    /* 040 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 041 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 042 */       UTF8String inputadapter_value = inputadapter_isNull ? null 
: (inputadapter_row.getUTF8String(0));
    /* 043 */
    /* 044 */       // common sub-expressions
    /* 045 */
    /* 046 */       // evaluate aggregate function
    /* 047 */       boolean sagg_isNull3 = sagg_bufIsNull;
    /* 048 */       UTF8String sagg_value3 = sagg_bufValue;
    /* 049 */
    /* 050 */       if (!inputadapter_isNull && (sagg_isNull3 ||
    /* 051 */           (inputadapter_value.compare(sagg_value3)) > 0)) {
    /* 052 */         sagg_isNull3 = false;
    /* 053 */         sagg_value3 = inputadapter_value;
    /* 054 */       }
    /* 055 */       // update aggregation buffer
    /* 056 */       sagg_bufIsNull = sagg_isNull3;
    /* 057 */       if (sagg_bufValue != sagg_value3)
    /* 058 */       sagg_bufValue = sagg_value3 == null ? null : 
sagg_value3.clone();
    /* 059 */       if (shouldStop()) return;
    /* 060 */     }
    /* 061 */
    /* 062 */   }
    /* 063 */
    /* 064 */   protected void processNext() throws java.io.IOException {
    /* 065 */     while (!sagg_initAgg) {
    /* 066 */       sagg_initAgg = true;
    /* 067 */       long sagg_beforeAgg = System.nanoTime();
    /* 068 */       sagg_doAggregateWithoutKey();
    /* 069 */       sagg_aggTime.add((System.nanoTime() - sagg_beforeAgg) / 
1000000);
    /* 070 */
    /* 071 */       // output the result
    /* 072 */
    /* 073 */       sagg_numOutputRows.add(1);
    /* 074 */       sagg_holder.reset();
    /* 075 */
    /* 076 */       sagg_rowWriter.zeroOutNullBytes();
    /* 077 */
    /* 078 */       if (sagg_bufIsNull) {
    /* 079 */         sagg_rowWriter.setNullAt(0);
    /* 080 */       } else {
    /* 081 */         sagg_rowWriter.write(0, sagg_bufValue);
    /* 082 */       }
    /* 083 */       sagg_result.setTotalSize(sagg_holder.totalSize());
    /* 084 */       append(sagg_result);
    /* 085 */     }
    /* 086 */   }
    /* 087 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to