Github user viirya commented on the pull request:
https://github.com/apache/spark/pull/11484#issuecomment-191578085
Before this patch, the generated codes:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private boolean sort_needToSort;
/* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */ private
org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */ private scala.collection.Iterator<UnsafeRow>
sort_sortedIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private UnsafeRow sort_result;
/* 018 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sort_holder;
/* 019 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
sort_rowWriter;
/* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
sort_dataSize;
/* 021 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 022 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
sort_spillSize;
/* 023 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 024 */
/* 025 */ public GeneratedIterator(Object[] references) {
/* 026 */ this.references = references;
/* 027 */ }
/* 028 */
/* 029 */ public void init(scala.collection.Iterator inputs[]) {
/* 030 */ sort_needToSort = true;
/* 031 */ this.sort_plan = (org.apache.spark.sql.execution.Sort)
references[0];
/* 032 */ sort_sorter = sort_plan.createSorter();
/* 033 */ sort_metrics =
org.apache.spark.TaskContext.get().taskMetrics();
/* 034 */
/* 035 */ inputadapter_input = inputs[0];
/* 036 */ sort_result = new UnsafeRow(1);
/* 037 */ this.sort_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sort_result, 0)
;
/* 038 */ this.sort_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sort_hold
er, 1);
/* 039 */ this.sort_dataSize =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 040 */ sort_metricValue =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
sort_dataSize.localValu
e();
/* 041 */ this.sort_spillSize =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 042 */ sort_metricValue1 =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
sort_spillSize.localVa
lue();
/* 043 */ }
/* 044 */
/* 045 */ private void sort_addToSorter() throws java.io.IOException {
/* 046 */ while (inputadapter_input.hasNext()) {
/* 047 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 048 */ /* input[0, bigint] */
/* 049 */ boolean inputadapter_isNull =
inputadapter_row.isNullAt(0);
/* 050 */ long inputadapter_value = inputadapter_isNull ? -1L :
(inputadapter_row.getLong(0));
/* 051 */ // Convert the input attributes to an UnsafeRow and add
it to the sorter
/* 052 */
/* 053 */ sort_rowWriter.zeroOutNullBytes();
/* 054 */
/* 055 */ if (inputadapter_isNull) {
/* 056 */ sort_rowWriter.setNullAt(0);
/* 057 */ } else {
/* 058 */ sort_rowWriter.write(0, inputadapter_value);
/* 059 */ }
/* 060 */
/* 061 */ sort_sorter.insertRow(sort_result);
/* 062 */ if (shouldStop()) {
/* 063 */ return;
/* 064 */ }
/* 065 */ }
/* 066 */
/* 067 */ }
/* 068 */
/* 069 */ protected void processNext() throws java.io.IOException {
/* 070 */ if (sort_needToSort) {
/* 071 */ sort_addToSorter();
/* 072 */ Long sort_spillSizeBefore =
sort_metrics.memoryBytesSpilled();
/* 073 */ sort_sortedIter = sort_sorter.sort();
/* 074 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 075 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled()
- sort_spillSizeBefore);
/* 076 */
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 077 */ sort_needToSort = false;
/* 078 */ }
/* 079 */
/* 080 */ while (sort_sortedIter.hasNext()) {
/* 081 */ UnsafeRow sort_outputRow =
(UnsafeRow)sort_sortedIter.next();
/* 082 */ append(sort_outputRow.copy());
/* 083 */ if (shouldStop()) return;
/* 084 */ }
/* 085 */ }
/* 086 */ }
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]