Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17122#discussion_r104467009
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
    @@ -206,6 +206,21 @@ trait CodegenSupport extends SparkPlan {
       def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): 
String = {
         throw new UnsupportedOperationException
       }
    +
    +  /**
    +   * For optimization to suppress shouldStop() in a loop of 
WholeStageCodegen.
    +   * Returning true means we need to insert shouldStop() into the loop 
producing rows, if any.
    +   */
    +  def isShouldStopRequired: Boolean = {
    +    return shouldStopRequired && !(this.parent != null && 
!this.parent.isShouldStopRequired)
    +  }
    +
    +  /**
    +   * Set to false if this plan consumes all rows produced by children but 
doesn't output row
    +   * to buffer by calling append(), so the children don't require 
shouldStop()
    +   * in the loop of producing rows.
    +   */
    +  protected def shouldStopRequired: Boolean = true
    --- End diff --
    
    I cannot see performance improvement on Sort. I think there are two reasons 
for this result.
    
    One is that the loop body is too large. At the inner-most loop, a 
[`insertRow` 
method](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java#L107-L120)
 is called. I think that the size of this method is too large to facilitate 
loop optimizations.
    
    The other is that the hotspot method is not here. I guess that the hotspot 
method may be  `sort()` at line 154.
    
    Here is the generated code.
    ```java
    /* 114 */     while (true) {
    /* 115 */       long range_range = range_batchEnd - range_number;
    /* 116 */       if (range_range != 0L) {
    /* 117 */         int range_localEnd = (int)(range_range / -1L);
    /* 118 */         for (int range_localIdx = 0; range_localIdx < 
range_localEnd; range_localIdx++) {
    /* 119 */           long range_value = ((long)range_localIdx * -1L) + 
range_number;
    /* 120 */
    /* 121 */           range_rowWriter.write(0, range_value);
    /* 122 */           sort_sorter.insertRow((UnsafeRow)range_result);
    /* 123 */
    /* 124 */           // shouldStop check is eliminated
    /* 125 */         }
    /* 126 */         range_number = range_batchEnd;
    /* 127 */       }
    /* 128 */
    /* 129 */       if (range_taskContext.isInterrupted()) {
    /* 130 */         throw new TaskKilledException();
    /* 131 */       }
    /* 132 */
    /* 133 */       long range_nextBatchTodo;
    /* 134 */       if (range_numElementsTodo > 1000L) {
    /* 135 */         range_nextBatchTodo = 1000L;
    /* 136 */         range_numElementsTodo -= 1000L;
    /* 137 */       } else {
    /* 138 */         range_nextBatchTodo = range_numElementsTodo;
    /* 139 */         range_numElementsTodo = 0;
    /* 140 */         if (range_nextBatchTodo == 0) break;
    /* 141 */       }
    /* 142 */       range_numOutputRows.add(range_nextBatchTodo);
    /* 143 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
    /* 144 */
    /* 145 */       range_batchEnd += range_nextBatchTodo * -1L;
    /* 146 */     }
    /* 147 */
    /* 148 */   }
    /* 149 */
    /* 150 */   protected void processNext() throws java.io.IOException {
    /* 151 */     if (sort_needToSort) {
    /* 152 */       long sort_spillSizeBefore = 
sort_metrics.memoryBytesSpilled();
    /* 153 */       sort_addToSorter();
    /* 154 */       sort_sortedIter = sort_sorter.sort();
    /* 155 */       sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
    /* 156 */       sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
    /* 157 */       sort_spillSize.add(sort_metrics.memoryBytesSpilled() - 
sort_spillSizeBefore);
    /* 158 */       
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
    /* 159 */       sort_needToSort = false;
    /* 160 */     }
    /* 161 */
    /* 162 */     while (sort_sortedIter.hasNext()) {
    /* 163 */       UnsafeRow sort_outputRow = 
(UnsafeRow)sort_sortedIter.next();
    /* 164 */
    /* 165 */       append(sort_outputRow);
    /* 166 */
    /* 167 */       if (shouldStop()) return;
    /* 168 */     }
    /* 169 */   }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to