Github user viirya commented on the pull request:
https://github.com/apache/spark/pull/11484#issuecomment-191578173
After this patch, the generated codes:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private boolean sort_needToSort;
/* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */ private
org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */ private scala.collection.Iterator<UnsafeRow>
sort_sortedIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
sort_dataSize;
/* 018 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
sort_spillSize;
/* 020 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */ public GeneratedIterator(Object[] references) {
/* 023 */ this.references = references;
/* 024 */ }
/* 025 */
/* 026 */ public void init(scala.collection.Iterator inputs[]) {
/* 027 */ sort_needToSort = true;
/* 028 */ this.sort_plan = (org.apache.spark.sql.execution.Sort)
references[0];
/* 029 */ sort_sorter = sort_plan.createSorter();
/* 030 */ sort_metrics =
org.apache.spark.TaskContext.get().taskMetrics();
/* 031 */
/* 032 */ inputadapter_input = inputs[0];
/* 033 */ this.sort_dataSize =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 034 */ sort_metricValue =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
sort_dataSize.localValu
e();
/* 035 */ this.sort_spillSize =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 036 */ sort_metricValue1 =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
sort_spillSize.localVa
lue();
/* 037 */ }
/* 038 */
/* 039 */ private void sort_addToSorter() throws java.io.IOException {
/* 040 */ while (inputadapter_input.hasNext()) {
/* 041 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 042 */
/* 043 */
sort_sorter.insertRow((UnsafeRow)inputadapter_row.copy());
/* 044 */ if (shouldStop()) {
/* 045 */ return;
/* 046 */ }
/* 047 */ }
/* 048 */
/* 049 */ }
/* 050 */
/* 051 */ protected void processNext() throws java.io.IOException {
/* 052 */ if (sort_needToSort) {
/* 053 */ sort_addToSorter();
/* 054 */ Long sort_spillSizeBefore =
sort_metrics.memoryBytesSpilled();
/* 055 */ sort_sortedIter = sort_sorter.sort();
/* 056 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 057 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled()
- sort_spillSizeBefore);
/* 058 */
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 059 */ sort_needToSort = false;
/* 060 */ }
/* 061 */
/* 062 */ while (sort_sortedIter.hasNext()) {
/* 063 */ UnsafeRow sort_outputRow =
(UnsafeRow)sort_sortedIter.next();
/* 064 */ append(sort_outputRow.copy());
/* 065 */ if (shouldStop()) return;
/* 066 */ }
/* 067 */ }
/* 068 */ }
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]