Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/11484#issuecomment-191578085
  
    Before this patch, the generated codes:
    
        /* 001 */ public Object generate(Object[] references) {
        /* 002 */   return new GeneratedIterator(references);
        /* 003 */ }
        /* 004 */
        /* 005 */ /** Codegened pipeline for:
        /* 006 */ * Sort [id#22L ASC], true, 0
        /* 007 */ +- INPUT
        /* 008 */ */
        /* 009 */ class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
        /* 010 */   private Object[] references;
        /* 011 */   private boolean sort_needToSort;
        /* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
        /* 013 */   private 
org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
        /* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
        /* 015 */   private scala.collection.Iterator<UnsafeRow> 
sort_sortedIter;
        /* 016 */   private scala.collection.Iterator inputadapter_input;
        /* 017 */   private UnsafeRow sort_result;
        /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sort_holder;
        /* 019 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
sort_rowWriter;
        /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
sort_dataSize;
        /* 021 */   private 
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
        /* 022 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
sort_spillSize;
        /* 023 */   private 
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
        /* 024 */
        /* 025 */   public GeneratedIterator(Object[] references) {
        /* 026 */     this.references = references;
        /* 027 */   }
        /* 028 */
        /* 029 */   public void init(scala.collection.Iterator inputs[]) {
        /* 030 */     sort_needToSort = true;
        /* 031 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) 
references[0];
        /* 032 */     sort_sorter = sort_plan.createSorter();
        /* 033 */     sort_metrics = 
org.apache.spark.TaskContext.get().taskMetrics();
        /* 034 */
        /* 035 */     inputadapter_input = inputs[0];
        /* 036 */     sort_result = new UnsafeRow(1);
        /* 037 */     this.sort_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sort_result, 0)
        ;
        /* 038 */     this.sort_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sort_hold
        er, 1);
        /* 039 */     this.sort_dataSize = 
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
        /* 040 */     sort_metricValue = 
(org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
sort_dataSize.localValu
        e();
        /* 041 */     this.sort_spillSize = 
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
        /* 042 */     sort_metricValue1 = 
(org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
sort_spillSize.localVa
        lue();
        /* 043 */   }
        /* 044 */
        /* 045 */   private void sort_addToSorter() throws java.io.IOException {
        /* 046 */     while (inputadapter_input.hasNext()) {
        /* 047 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
        /* 048 */       /* input[0, bigint] */
        /* 049 */       boolean inputadapter_isNull = 
inputadapter_row.isNullAt(0);
        /* 050 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
        /* 051 */       // Convert the input attributes to an UnsafeRow and add 
it to the sorter
        /* 052 */
        /* 053 */       sort_rowWriter.zeroOutNullBytes();
        /* 054 */
        /* 055 */       if (inputadapter_isNull) {
        /* 056 */         sort_rowWriter.setNullAt(0);
        /* 057 */       } else {
        /* 058 */         sort_rowWriter.write(0, inputadapter_value);
        /* 059 */       }
        /* 060 */
        /* 061 */       sort_sorter.insertRow(sort_result);
        /* 062 */       if (shouldStop()) {
        /* 063 */         return;
        /* 064 */       }
        /* 065 */     }
        /* 066 */
        /* 067 */   }
        /* 068 */
        /* 069 */   protected void processNext() throws java.io.IOException {
        /* 070 */     if (sort_needToSort) {
        /* 071 */       sort_addToSorter();
        /* 072 */       Long sort_spillSizeBefore = 
sort_metrics.memoryBytesSpilled();
        /* 073 */       sort_sortedIter = sort_sorter.sort();
        /* 074 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
        /* 075 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() 
- sort_spillSizeBefore);
        /* 076 */       
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
        /* 077 */       sort_needToSort = false;
        /* 078 */     }
        /* 079 */
        /* 080 */     while (sort_sortedIter.hasNext()) {
        /* 081 */       UnsafeRow sort_outputRow = 
(UnsafeRow)sort_sortedIter.next();
        /* 082 */       append(sort_outputRow.copy());
        /* 083 */       if (shouldStop()) return;
        /* 084 */     }
        /* 085 */   }
        /* 086 */ }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to