[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056716#comment-16056716 ] ASF GitHub Bot commented on DRILL-5457: --- Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/822 > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Labels: ready-to-commit > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055038#comment-16055038 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122863264 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -149,14 +149,24 @@ public IterOutcome innerNext() { if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed - if ( wasKilled ) { // if kill() was called before, then finish up -aggregator.cleanup(); -incoming.kill(false); -return IterOutcome.NONE; - } } -AggOutcome out = aggregator.doWork(); +if ( wasKilled ) { // if kill() was called before, then finish up --- End diff -- Done > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055037#comment-16055037 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122863244 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -149,14 +149,24 @@ public IterOutcome innerNext() { if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed - if ( wasKilled ) { // if kill() was called before, then finish up -aggregator.cleanup(); -incoming.kill(false); -return IterOutcome.NONE; - } } -AggOutcome out = aggregator.doWork(); +if ( wasKilled ) { // if kill() was called before, then finish up + aggregator.cleanup(); + incoming.kill(false); + return IterOutcome.NONE; +} + +// Read and aggregate records +// ( may need to run again if the spilled partition that was read +// generated new partitions that were all spilled ) +AggOutcome out = AggOutcome.CALL_WORK_AGAIN; +while ( out == AggOutcome.CALL_WORK_AGAIN) { + // + // Read incoming batches and process their records + // + out = aggregator.doWork(); +} --- End diff -- Done ( do while ...) > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055009#comment-16055009 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122858748 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -149,14 +149,24 @@ public IterOutcome innerNext() { if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed - if ( wasKilled ) { // if kill() was called before, then finish up -aggregator.cleanup(); -incoming.kill(false); -return IterOutcome.NONE; - } } -AggOutcome out = aggregator.doWork(); +if ( wasKilled ) { // if kill() was called before, then finish up + aggregator.cleanup(); + incoming.kill(false); + return IterOutcome.NONE; +} + +// Read and aggregate records +// ( may need to run again if the spilled partition that was read +// generated new partitions that were all spilled ) +AggOutcome out = AggOutcome.CALL_WORK_AGAIN; +while ( out == AggOutcome.CALL_WORK_AGAIN) { + // + // Read incoming batches and process their records + // + out = aggregator.doWork(); +} --- End diff -- Scratch that, I see you need the value of "out". So: ``` AggOutcome out; do { // // Read incoming batches and process their records // out = aggregator.doWork(); } while (out == AggOutcome.CALL_WORK_AGAIN) { ``` Or Even: ``` // Read incoming batches and process their records AggOutcome out; while ((out = aggregator.doWork()) == AggOutcome.CALL_WORK_AGAIN) { // Nothing to do } ``` > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054994#comment-16054994 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122858065 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -149,14 +149,24 @@ public IterOutcome innerNext() { if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed - if ( wasKilled ) { // if kill() was called before, then finish up -aggregator.cleanup(); -incoming.kill(false); -return IterOutcome.NONE; - } } -AggOutcome out = aggregator.doWork(); +if ( wasKilled ) { // if kill() was called before, then finish up + aggregator.cleanup(); + incoming.kill(false); + return IterOutcome.NONE; +} + +// Read and aggregate records +// ( may need to run again if the spilled partition that was read +// generated new partitions that were all spilled ) +AggOutcome out = AggOutcome.CALL_WORK_AGAIN; +while ( out == AggOutcome.CALL_WORK_AGAIN) { + // + // Read incoming batches and process their records + // + out = aggregator.doWork(); +} --- End diff -- ``` while (aggregator.doWork() == AggOutcome.CALL_WORK_AGAIN) { // Nothing to do } ``` ? In one of your reviews you said you didn't like empty loops, but sometimes they are handy... > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054995#comment-16054995 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122858048 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -149,14 +149,24 @@ public IterOutcome innerNext() { if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed - if ( wasKilled ) { // if kill() was called before, then finish up -aggregator.cleanup(); -incoming.kill(false); -return IterOutcome.NONE; - } } -AggOutcome out = aggregator.doWork(); +if ( wasKilled ) { // if kill() was called before, then finish up --- End diff -- Spaces, here and below. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051150#comment-16051150 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122324232 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -130,6 +127,7 @@ private int currentIndex = 0; private IterOutcome outcome; private int numGroupedRecords = 0; + private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() --- End diff -- Thanks for the explanation. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051148#comment-16051148 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122324618 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { +v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { +outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051149#comment-16051149 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122324389 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { --- End diff -- If this is original code, then clearly it does the right thing else we'd have seen problems by now. So, fine to leave as-is. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051144#comment-16051144 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122324273 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { +v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { +outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051138#comment-16051138 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122323606 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { +v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { +outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051133#comment-16051133 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122322345 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { --- End diff -- Mmmm... this is the original Hash Agg code. Hence probably the two invariants are met. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051112#comment-16051112 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122321138 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -154,7 +152,7 @@ private int cycleNum = 0; // primary, secondary, tertiary, etc. private int originalPartition = -1; // the partition a secondary reads from - private class SpilledPartition { public int spilledBatches; public String spillFile /* Path filePath */; int cycleNum; int origPartn; int prevOrigPartn; } + private class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; } --- End diff -- Done, thanks. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051107#comment-16051107 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122320539 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -130,6 +127,7 @@ private int currentIndex = 0; private IterOutcome outcome; private int numGroupedRecords = 0; + private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() --- End diff -- The getRecordCount() virtual method is called **per each record** ! And in some cases this method performs several checks. Unfortunately other inefficiencies indeed dwarf this savings. A local variable won't work, as execution may return and come back (e.g. spill) midway processing the incoming batch. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051078#comment-16051078 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122313161 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { +v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { +outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051080#comment-16051080 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122310219 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -130,6 +127,7 @@ private int currentIndex = 0; private IterOutcome outcome; private int numGroupedRecords = 0; + private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() --- End diff -- Not sure that the very small savings in time is worth the complexity of keeping a cached copy in sync. If needed for an inner loop, can it be a local variable instead? > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051081#comment-16051081 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122312536 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { --- End diff -- Two comments/questions here. First, can we be sure that there is always a Selection Vector Remover between the hash agg and anything that can emit a batch that uses an SV4 (such as sort)? Otherwise, the `getValueVector()` method won't work. Second, is it certain that no other code references the vectors from this container? If both those invariants are met, then, yes, it is the job of this operator to release buffers created by the upstream. > Support Spill to Disk for the Hash Aggregate Operator >
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051079#comment-16051079 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122312907 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { -try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { +while (true) { -// This would be called only once - after actual data arrives on incoming -if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); -} + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { +this.schema = incoming.getSchema(); +currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch +// Calculate the number of partitions based on actual incoming data +delayedSetup(); + } -// loop through existing records, aggregating the values as necessary. -if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { +logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { +if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } -for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { -logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { -outputCurrentBatch(); -incIndex(); // next time continue with the next incoming row -return AggOutcome.RETURN_OUTCOME; - } +checkGroupAndAggrValues(currentIndex); +// If adding a group discovered a memory pressure during 1st phase, then start +// outputing some partition downstream in order to free memory. +if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { +logger.debug("Processed {} records", underlyingIndex); + } -if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { +v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { +outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051077#comment-16051077 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122310373 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -154,7 +152,7 @@ private int cycleNum = 0; // primary, secondary, tertiary, etc. private int originalPartition = -1; // the partition a secondary reads from - private class SpilledPartition { public int spilledBatches; public String spillFile /* Path filePath */; int cycleNum; int origPartn; int prevOrigPartn; } + private class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; } --- End diff -- `private static class` since you don't have any methods and so have no use for the "inner this" pointer. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035771#comment-16035771 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119977034 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035741#comment-16035741 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119975302 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035735#comment-16035735 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119975022 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035725#comment-16035725 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119974724 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035697#comment-16035697 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119973491 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035440#comment-16035440 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119954158 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); --- End diff -- Concurrent open files: While spilling, there is one per each (non-pristine) spilling partition (yes, can be as high as 16, or even 32). Afterwards, they are all closed; then for reading, each one gets opened; and though we process one partition at a time, closing of all is postponed to the end, as the processing code is unaware that the "incoming" actually comes from a spill file. About the limits: Seems that current defaults (e.g. 64K open files per process) can serve us well for the foreseeable future. Intel just announced the i9, where the top of the line CPU has 18 cores. Hence 1000s of concurrent active same-process threads are not feasible anytime soon (think about context switching). > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035385#comment-16035385 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119947814 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); --- End diff -- This is HashAgg closing time, so GC probably does it; anyway won't hurt -- added a close call. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033872#comment-16033872 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119749313 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -323,38 +717,32 @@ public AggOutcome doWork() { if (EXTRA_DEBUG_1) { logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); } -//newSchema = true; this.cleanup(); // TODO: new schema case needs to be handled appropriately return AggOutcome.UPDATE_AGGREGATOR; case OK: resetIndex(); -if (incoming.getRecordCount() == 0) { - continue; -} else { - checkGroupAndAggrValues(currentIndex); - incIndex(); - - if (EXTRA_DEBUG_1) { -logger.debug("Continuing outside loop"); - } - continue outside; + +if (EXTRA_DEBUG_1) { + logger.debug("Continuing outside loop"); } +continue outside; case NONE: -// outcome = out; +underlyingIndex = 0; // in case need to handle a spilled partition +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} --- End diff -- Done -- all SCE catchers either call UnsupportedOperationException, or (when clearly not possible then) IllegalStateException > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033860#comment-16033860 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119748791 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch from the incoming IterOutcome out = outgoing.next(0, incoming); + +// If incoming batch is bigger than our estimate - adjust the estimate +long afterAlloc = allocator.getAllocatedMemory(); +long incomingBatchSize = afterAlloc - beforeAlloc; +if ( /* ! handlingSpills && */ estMaxBatchSize < incomingBatchSize ) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}",incomingBatchSize,estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; +} + if (EXTRA_DEBUG_1) { logger.debug("Received IterOutcome of {}", out); } switch (out) { + case RESTART: +logger.warn("HASH AGG: doWork got a RESTART..."); --- End diff -- This code (and the RESTART state) was eliminated > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033854#comment-16033854 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119748615 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + --- End diff -- The doWork() function's basic outline is the same as before; this code change just peppered few lines of code in few places. I was trying to minimize the change > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033857#comment-16033857 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119748684 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch from the incoming IterOutcome out = outgoing.next(0, incoming); + +// If incoming batch is bigger than our estimate - adjust the estimate +long afterAlloc = allocator.getAllocatedMemory(); +long incomingBatchSize = afterAlloc - beforeAlloc; +if ( /* ! handlingSpills && */ estMaxBatchSize < incomingBatchSize ) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}",incomingBatchSize,estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; +} + if (EXTRA_DEBUG_1) { logger.debug("Received IterOutcome of {}", out); } switch (out) { + case RESTART: --- End diff -- RESTART was eliminated ... > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033838#comment-16033838 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119747147 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -285,8 +648,18 @@ public AggOutcome doWork() { // In the future HashAggregate may also need to perform some actions conditionally // in the outer try block. + assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; + outside: while (true) { + +// This would be called only once - after actual data arrives on incoming +if ( schema == null && incoming.getRecordCount() > 0 ) { + this.schema = incoming.getSchema(); + // Calculate the number of partitions based on actual incoming data + delayedSetup(); +} + --- End diff -- Annotate history points to @amansinha100 as the creator And the trace level logging is already used heavily for "normal monitoring" (e.g., number of partitions chosen, size estimates). Need some new levels below trace, which unfortunately cannot be created in LogBack . > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033833#comment-16033833 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119745935 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -285,8 +648,18 @@ public AggOutcome doWork() { // In the future HashAggregate may also need to perform some actions conditionally // in the outer try block. + assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; + outside: while (true) { + +// This would be called only once - after actual data arrives on incoming +if ( schema == null && incoming.getRecordCount() > 0 ) { --- End diff -- There is no code for OK_FIRST_NON_EMPTY; and the local field "schema" is used here as a flag to note "setup not yet performed" ( not always matched with OK_NEW_SCHEMA; sometimes the second batch with an OK is the first non empty batch). And next() is a FINAL method (in AbstractRecordBatch), which in turn invokes other next() methods of other classes extending RecordBatch (like the new SpilledRecordBatch). Should we put there the code to perform delayed setup for the HashAgg ? Even if the next() is modified to return a new flag like OK_FIRST_NON_EMPTY -- these flags are checked in the code below, starting from the second batch and on. Not sure where the code reading the first incoming batch is > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033757#comment-16033757 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119736678 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032355#comment-16032355 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119519134 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032350#comment-16032350 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119518393 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032348#comment-16032348 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119518069 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032210#comment-16032210 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119500755 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032013#comment-16032013 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119481421 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); }
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030546#comment-16030546 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119260986 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; --- End diff -- One reason for this is minimizing the code change -- the new code resembles the old code, only using arrays instead of scalars. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030540#comment-16030540 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119260746 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); --- End diff -- Done. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030538#comment-16030538 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119260493 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress --- End diff -- Added: `logger.warn("Spilling was disabled - not enough memory available for internal partitioning");` > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030522#comment-16030522 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119259465 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch --- End diff -- I think some test case (with an empty batch) failed there; anyway does not hurt. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030520#comment-16030520 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119259090 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} --- End diff -- Done. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030513#comment-16030513 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119258769 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -230,15 +452,35 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme throw new IllegalArgumentException("Wrong number of workspace variables."); } -//this.context = context; +this.context = context; this.stats = stats; -this.allocator = allocator; +this.allocator = oContext.getAllocator(); +this.oContext = oContext; this.incoming = incoming; -//this.schema = incoming.getSchema(); this.outgoing = outgoing; this.outContainer = outContainer; +this.operatorId = hashAggrConfig.getOperatorId(); + +is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; +isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; +canSpill = isTwoPhase; // single phase can not spill --- End diff -- Three booleans is 2^3 ( == 8), and this space is sparse as there are dependencies (e.g. Single phase can not spill, and has no 1st/2nd). Those booleans are used only in a couple of places in the code to make a logical difference, hence it'll be an overkill to subclass based on these changes. They are used in many other places for logging messages > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030505#comment-16030505 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119258161 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; +private InputStream spillStream; +private int spilledBatches; +private FragmentContext context; +private BatchSchema schema; +private OperatorContext oContext; +// Path spillStreamPath; +private String spillFile; +VectorAccessibleSerializable vas; + +public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext) { + this.context = context; + this.schema = schema; + this.spilledBatches = spilledBatches; + this.oContext = oContext; + //this.spillStreamPath = spillStreamPath; + this.spillFile = spillFile; + vas = new VectorAccessibleSerializable(allocator); + container = vas.get(); + + try { +this.spillStream = spillSet.openForInput(spillFile); + } catch (IOException e) { throw new RuntimeException(e);} + + next(); // initialize the container +} + +@Override +public SelectionVector2 getSelectionVector2() { + throw new UnsupportedOperationException(); +} + +@Override +public SelectionVector4 getSelectionVector4() { + throw new UnsupportedOperationException(); +} + +@Override +public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); +} + +@Override +public VectorWrapper getValueAccessorById(Class clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); +} + +@Override +public Iteratoriterator() { + return container.iterator(); +} + +@Override +public FragmentContext getContext() { return context; } + +@Override +public BatchSchema getSchema() { return schema; } + +@Override +public WritableBatch getWritableBatch() { + return WritableBatch.get(this); +} + +@Override +public VectorContainer getOutgoingContainer() { return container; } + +@Override +public int getRecordCount() { return container.getRecordCount(); } + +@Override +public void kill(boolean sendUpstream) { + this.close(); // delete the current spill file +} + +/** + * Read the next batch from the spill file + * + * @return IterOutcome + */ +@Override +public IterOutcome next() { --- End diff -- HashAgg is unique in the way it reads (and processes) the spilled batches exactly like reading (and processing) the incoming batches. Its actual code footprint is quite small (mostly in the next() method). SpilledRun extends BatchGroup, and seems to have more logic. The two seem to have too many differences to bother in combining them > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL:
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030496#comment-16030496 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119257032 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; --- End diff -- No -- the next() method reassigns this container (when reading the next batch from the spill file). > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030495#comment-16030495 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119256961 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; --- End diff -- Done. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030493#comment-16030493 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119256700 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { --- End diff -- Moved out the inner class into an independent class. (The "static inner class" approach failed due to a possible bug with Drill bytecode fixup -- the first parameter in the constructor was eliminated in the generated code). Extending the record batch interface was intentional - this leads to a very neat handling of the batch in the Hash Agg -- no code change, just like handling the batches incoming from upstream. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030402#comment-16030402 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119245339 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -166,20 +252,23 @@ public BatchHolder() { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc); } maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch); return true; } private void setup() { - setupInterior(incoming, outgoing, aggrValuesContainer); + try { setupInterior(incoming, outgoing, aggrValuesContainer); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { -outputRecordValues(i, batchOutputCount); +try { outputRecordValues(i, batchOutputCount); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} --- End diff -- Done ! > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030401#comment-16030401 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119245285 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -166,20 +252,23 @@ public BatchHolder() { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc); } --- End diff -- Yes, but only when %100 sure; else if there's a chance for a SchemaChange -- Better use UnsupportedOperationException ... Code change done !! > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030372#comment-16030372 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119241352 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -136,15 +136,21 @@ public IterOutcome innerNext() { return IterOutcome.NONE; } -if (aggregator.buildComplete() && !aggregator.allFlushed()) { - // aggregation is complete and not all records have been output yet - return aggregator.outputCurrentBatch(); +// if aggregation is complete and not all records have been output yet +if (aggregator.buildComplete() || +// or: 1st phase need to return (not fully grouped) partial output due to memory pressure +aggregator.earlyOutput()) { + // then output the next batch downstream + IterOutcome out = aggregator.outputCurrentBatch(); --- End diff -- Done !! Looks so simple in retrospective ... > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027239#comment-16027239 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812656 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027215#comment-16027215 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811633 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java --- @@ -136,15 +136,21 @@ public IterOutcome innerNext() { return IterOutcome.NONE; } -if (aggregator.buildComplete() && !aggregator.allFlushed()) { - // aggregation is complete and not all records have been output yet - return aggregator.outputCurrentBatch(); +// if aggregation is complete and not all records have been output yet +if (aggregator.buildComplete() || +// or: 1st phase need to return (not fully grouped) partial output due to memory pressure +aggregator.earlyOutput()) { + // then output the next batch downstream + IterOutcome out = aggregator.outputCurrentBatch(); --- End diff -- Since `HashAggregator` is not an operator executor (AKA record batch), it does not have to follow the iterator protocol and use the `IterOutcome` enum. Instead, you can define your own. You won't need the `OK_NEW_SCHEMA`, `OUT_OF_MEMORY`, `FAIL` or `NOT_YET` values. All you seem to need is `OK`, `NONE` and `RESTART`. This approach will avoid the need to change the `IterOutcome` enum and export your states to all of the Drill iterator protocol. Did something similar in Sort for the iterator class that returns either in-memory or merged spilled batches. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027246#comment-16027246 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813958 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027252#comment-16027252 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812514 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027254#comment-16027254 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813928 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027256#comment-16027256 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813651 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -285,8 +648,18 @@ public AggOutcome doWork() { // In the future HashAggregate may also need to perform some actions conditionally // in the outer try block. + assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; + outside: while (true) { + +// This would be called only once - after actual data arrives on incoming +if ( schema == null && incoming.getRecordCount() > 0 ) { --- End diff -- This kind of work is often done in response to the status codes from the upstream operator. Have to handle OK_NEW_SCHEMA, OK. The schema is defined on the first batch, with OK_NEW_SCHEMA, typically with a row count of 0. Must also handle (and probably fail) for OK_NEW_SCHEMA on subsequent batches. By putting the code here, rather in the code that calls the upstream `next()` it is necessary to reconcile here with that other code when doing a review. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027249#comment-16027249 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814041 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027229#comment-16027229 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812097 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; +private InputStream spillStream; +private int spilledBatches; +private FragmentContext context; +private BatchSchema schema; +private OperatorContext oContext; +// Path spillStreamPath; +private String spillFile; +VectorAccessibleSerializable vas; + +public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext) { + this.context = context; + this.schema = schema; + this.spilledBatches = spilledBatches; + this.oContext = oContext; + //this.spillStreamPath = spillStreamPath; + this.spillFile = spillFile; + vas = new VectorAccessibleSerializable(allocator); + container = vas.get(); + + try { +this.spillStream = spillSet.openForInput(spillFile); + } catch (IOException e) { throw new RuntimeException(e);} + + next(); // initialize the container +} + +@Override +public SelectionVector2 getSelectionVector2() { + throw new UnsupportedOperationException(); +} + +@Override +public SelectionVector4 getSelectionVector4() { + throw new UnsupportedOperationException(); +} + +@Override +public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); +} + +@Override +public VectorWrapper getValueAccessorById(Class clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); +} + +@Override +public Iteratoriterator() { + return container.iterator(); +} + +@Override +public FragmentContext getContext() { return context; } + +@Override +public BatchSchema getSchema() { return schema; } + +@Override +public WritableBatch getWritableBatch() { + return WritableBatch.get(this); +} + +@Override +public VectorContainer getOutgoingContainer() { return container; } + +@Override +public int getRecordCount() { return container.getRecordCount(); } + +@Override +public void kill(boolean sendUpstream) { + this.close(); // delete the current spill file +} + +/** + * Read the next batch from the spill file + * + * @return IterOutcome + */ +@Override +public IterOutcome next() { --- End diff -- Ah! It is clear what is happening now. This RecordBatch is a kind of operator: one that reads from disk into the attached container. Quite clever, actually. But probably overly complex and hard to maintain. Maybe what you want is to reuse the `SpilledRun` class from the external sort. Maybe we can generalize that class a bit. It already has the logic to associate a spilled run with a file, iterate over rows and so on. (Though, the iterator behavior should be cleaned up...) `SpilledRun` does that without the full
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027217#comment-16027217 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812353 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; --- End diff -- This is a bit old-school. Might as well use the power of OO to simplify the code. Rather than six arrays which must be kept in sync, perhaps one array that contains instances of a class, where the class members are the various per-partition state variables. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement >
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027259#comment-16027259 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813870 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} --- End diff -- `IllegalStateException` > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027241#comment-16027241 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813677 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -285,8 +648,18 @@ public AggOutcome doWork() { // In the future HashAggregate may also need to perform some actions conditionally // in the outer try block. + assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; + outside: while (true) { + +// This would be called only once - after actual data arrives on incoming +if ( schema == null && incoming.getRecordCount() > 0 ) { + this.schema = incoming.getSchema(); + // Calculate the number of partitions based on actual incoming data + delayedSetup(); +} + --- End diff -- Comment on original code below: no need for the EXTRA_DEBUG_1 and EXTRA_DEBUG_2: just use trace level logging and avoid the extra complexity. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027226#comment-16027226 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812409 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027233#comment-16027233 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812498 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027261#comment-16027261 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813868 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { --- End diff -- Method on the partition state class > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027221#comment-16027221 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811998 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; --- End diff -- No need to initialize to null; Java does that for you for member variables (AKA fields.) > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027240#comment-16027240 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813748 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch from the incoming IterOutcome out = outgoing.next(0, incoming); + +// If incoming batch is bigger than our estimate - adjust the estimate +long afterAlloc = allocator.getAllocatedMemory(); +long incomingBatchSize = afterAlloc - beforeAlloc; +if ( /* ! handlingSpills && */ estMaxBatchSize < incomingBatchSize ) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}",incomingBatchSize,estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; +} + if (EXTRA_DEBUG_1) { logger.debug("Received IterOutcome of {}", out); } switch (out) { + case RESTART: +logger.warn("HASH AGG: doWork got a RESTART..."); --- End diff -- Shouldn't this throw an `IllegalStateException`? Something is terribly wrong if we get this where it is not expected. Simply logging zillions of messages to the log is not likely to call attention to the problem. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027228#comment-16027228 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812287 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress --- End diff -- Maybe log the problem to help track down issues on a production system? > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027260#comment-16027260 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813787 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); --- End diff -- Need to close the `outputStream[i]` before deleting the file? > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027222#comment-16027222 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812151 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -230,15 +452,35 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme throw new IllegalArgumentException("Wrong number of workspace variables."); } -//this.context = context; +this.context = context; this.stats = stats; -this.allocator = allocator; +this.allocator = oContext.getAllocator(); +this.oContext = oContext; this.incoming = incoming; -//this.schema = incoming.getSchema(); this.outgoing = outgoing; this.outContainer = outContainer; +this.operatorId = hashAggrConfig.getOperatorId(); + +is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; +isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; +canSpill = isTwoPhase; // single phase can not spill --- End diff -- All this state is copied for each generated class. But, nothing here is specific to the types of the vectors in the batches. As noted above, all this stuff should be factored out into a non-template class with the template holding only that code which is type-specific. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027237#comment-16027237 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812448 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027235#comment-16027235 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118804514 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -64,6 +64,12 @@ String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; + // Spill Options common to all spilling operators --- End diff -- Looks like a solution is coming. See DRILL-5547. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027219#comment-16027219 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812261 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch --- End diff -- Not sure this is even legal. A record batch must have a schema, even if the schema is an empty set of columns. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027220#comment-16027220 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812194 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -230,15 +452,35 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme throw new IllegalArgumentException("Wrong number of workspace variables."); } -//this.context = context; +this.context = context; this.stats = stats; -this.allocator = allocator; +this.allocator = oContext.getAllocator(); +this.oContext = oContext; this.incoming = incoming; -//this.schema = incoming.getSchema(); this.outgoing = outgoing; this.outContainer = outContainer; +this.operatorId = hashAggrConfig.getOperatorId(); + +is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; +isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; +canSpill = isTwoPhase; // single phase can not spill --- End diff -- Here we have three related booleans, or 2^8 cases. Consider using an enum to identify the (likely much smaller) number of actual cases. Maybe `ONE_PASS, FIRST_PHASE, SECOND_PHASE`? Then if the code does lots of "if this phase do that" kind of logic, it may be handy to have a single base class with common logic, then three (or whatever) base classes that define the phase-specific logic. Much easier to test and understand. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027244#comment-16027244 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813857 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); --- End diff -- This code makes me wonder... How many output streams are open at any one time? Drill is a highly concurrent system: we could have 1000s of fragments. If each has, say, a hash agg with 16 partitions, do we run the risk of 16,000 open file handles? Or, is the file handle opened only when needed for reading or writing? In general, as we add more spilling, we may need a global file handle cache that controls the number of open files. The same issue arises in the sort merge phase: all spill files maintain an open file handle; we might exceed some limit. For this PR, consider how long the file handle is open. Perhaps we need to file a JIRA about managing the total number of open files. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027242#comment-16027242 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811819 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -166,20 +252,23 @@ public BatchHolder() { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc); } maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch); return true; } private void setup() { - setupInterior(incoming, outgoing, aggrValuesContainer); + try { setupInterior(incoming, outgoing, aggrValuesContainer); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { -outputRecordValues(i, batchOutputCount); +try { outputRecordValues(i, batchOutputCount); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} --- End diff -- `IllegalStateException`? > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027225#comment-16027225 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811524 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java --- @@ -99,17 +100,31 @@ public void setCost(double cost) { this.cost = cost; } - // Not available. Presumably because Drill does not currently use - // this value, though it does appear in some test physical plans. -// public void setMaxAllocation(long alloc) { -//maxAllocation = alloc; -// } - @Override public long getMaxAllocation() { return maxAllocation; } + /** + * Any operator that supports spilling should override this method + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { --- End diff -- If we just want to fool Jackson we can use `@JsonIgnore`. Would be better to track down the root cause -- whatever it was. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027243#comment-16027243 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813769 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -323,38 +717,32 @@ public AggOutcome doWork() { if (EXTRA_DEBUG_1) { logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); } -//newSchema = true; this.cleanup(); // TODO: new schema case needs to be handled appropriately return AggOutcome.UPDATE_AGGREGATOR; case OK: resetIndex(); -if (incoming.getRecordCount() == 0) { - continue; -} else { - checkGroupAndAggrValues(currentIndex); - incIndex(); - - if (EXTRA_DEBUG_1) { -logger.debug("Continuing outside loop"); - } - continue outside; + +if (EXTRA_DEBUG_1) { + logger.debug("Continuing outside loop"); } +continue outside; case NONE: -// outcome = out; +underlyingIndex = 0; // in case need to handle a spilled partition +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} --- End diff -- `IllegalStateException` > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027255#comment-16027255 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813964 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027251#comment-16027251 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814014 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027247#comment-16027247 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814075 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027257#comment-16027257 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813733 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + +// Get the next RecordBatch from the incoming IterOutcome out = outgoing.next(0, incoming); + +// If incoming batch is bigger than our estimate - adjust the estimate +long afterAlloc = allocator.getAllocatedMemory(); +long incomingBatchSize = afterAlloc - beforeAlloc; +if ( /* ! handlingSpills && */ estMaxBatchSize < incomingBatchSize ) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}",incomingBatchSize,estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; +} + if (EXTRA_DEBUG_1) { logger.debug("Received IterOutcome of {}", out); } switch (out) { + case RESTART: --- End diff -- This is what ALL operators must do if we add this new status code. They all must check for the code and throw an assertion of it is received. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027230#comment-16027230 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812433 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027236#comment-16027236 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813700 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -306,14 +685,29 @@ public AggOutcome doWork() { while (true) { // Cleanup the previous batch since we are done processing it. +long pre = allocator.getAllocatedMemory(); for (VectorWrapper v : incoming) { v.getValueVector().clear(); } +long beforeAlloc = allocator.getAllocatedMemory(); + --- End diff -- This function seems overly long: very hard to follow. Perhaps break out each nested loop into a function. Since the functions are called once per batch, the cost of a function call is lost in the noise of operating the iterator hierarchy. The gain, however, is lower mental cost to understand what's happening. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027253#comment-16027253 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812566 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027231#comment-16027231 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812385 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore --- End diff -- Here, rather than doing a loop that
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027224#comment-16027224 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811796 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -166,20 +252,23 @@ public BatchHolder() { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc); } --- End diff -- Probably `IllegalStateException` to clearly state that this is a "this should never occur" kind of error. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027216#comment-16027216 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811771 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -18,82 +18,161 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.inject.Named; +import com.google.common.base.Stopwatch; + +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; + import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.IndexPointer; + +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; + +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.planner.physical.AggPrelBase; + +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.CloseableRecordBatch; + import org.apache.drill.exec.record.MaterializedField; + import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.BatchSchema; + import org.apache.drill.exec.record.VectorContainer; + +import org.apache.drill.exec.record.TypedFieldId; + +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + import org.apache.drill.exec.vector.AllocationHelper; + import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; + import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.hadoop.fs.Path; + +import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE; + --- End diff -- It is impressive how you were able to slide spilling into the existing code structure. Performance and modularity are never required, of course, but it may be worth at least considering them. Putting so much code in a template has a large drawback. Our current byte-code based code generation performs at its worst when templates are large. This template is the base for the generated code. In traditional Java, the size of a subclass is independent of the size of the superclass. So, if we used "plain old" Java, the size of this template would have no performance impact. But, with byte-code manipulation, each generated class contains a complete copy of the byte code for the template class. With a huge template, we make a huge copy every time. We pay a cost in terms of the time it takes to make the copy, then analyze the resulting byte codes. Also, we fill up the code cache with many copies of the
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027218#comment-16027218 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812014 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { +private VectorContainer container = null; --- End diff -- Can this be final? A spilled record batch only ever has one container: the one that it will spill? > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027227#comment-16027227 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811987 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { --- End diff -- This class extends the record batch interface. That interface is *VERY* confusing. It sounds like it is just a "bundle of vectors" that holds records. But, it is actually the definition of the Drill Volcano-like iterator protocol: it defines the methods needed to use your Spilled Record Batch class as an operator. Since this is not an operator, you don't need to extend that class. In fact, it is not clear you even need a superclass. To hold the vectors this class has a container member. This class does not need most of the vector-access methods as this is not an operator; any that are needed can be called on the container itself. Clearly a spilled batch need not follow the `next()` iterator protocol. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027250#comment-16027250 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814054 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027234#comment-16027234 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118812681 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } -ChainedHashTable ht = +spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); +baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); -this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - +this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; -batchHolders = new ArrayList(); -// First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + +// Set the number of partitions from the configuration (raise to a power of two, if needed) +numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); +if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); +} +while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; +} +if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch +else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); +} +long memAvail = memoryLimit - allocator.getAllocatedMemory(); +if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) +} else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { +numPartitions /= 2; +if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; +} + } +} +logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", +numPartitions, canSpill ? "Can" : "Cannot"); + +// The following initial safety check should be revisited once we can lower the number of rows in a batch +// In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) +if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); +} +// Based on the number of partitions: Set the mask and bit count +partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F +bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + +// Create arrays (one entry per partition) +htables = new HashTable[numPartitions] ; +batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; +outBatchIndex = new int[numPartitions] ; +outputStream = new OutputStream[numPartitions]; +spilledBatchesCount = new int[numPartitions]; +// spilledPaths = new Path[numPartitions]; +spillFiles = new String[numPartitions]; +spilledPartitionsList = new ArrayList(); + +plannedBatches = numPartitions; // each partition should allocate its first batch + +// initialize every (per partition) entry in the arrays +for (int i = 0; i < numPartitions; i++ ) { + try { +this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); +this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e);
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027223#comment-16027223 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118811955 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -204,24 +293,157 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, -@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { +@Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden -public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { +public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden -public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { +public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } + /** + * An internal class to replace "incoming" - instead scanning a spilled partition file + */ + public class SpilledRecordbatch implements CloseableRecordBatch { --- End diff -- This is an inner class: it has an implicit pointer back to the template. As a result, this inner class is copied for each code generation; each hash operation gets not only its own instance, but also its own copy of this class. Is this intended or needed? If not, then start by making this class static. Fix up any implicit references to the outer class. Then, move the class into a separate file so that the byte code manipulation method of CG does not make a copy of this class. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027248#comment-16027248 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813975 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027245#comment-16027245 ] ASF GitHub Bot commented on DRILL-5457: --- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814000 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { -// return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { -if (htable != null) { - htable.clear(); - htable = null; -} + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { +bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { +try { + spillSet.delete(spillFiles[i]); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); +} + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { +SpilledPartition sp = spilledPartitionsList.remove(0); +try { + spillSet.delete(sp.spillFile); +} catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); +} + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } -if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { +assert htables[part] != null; +htables[part].reset(); +if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } +batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -//this.outcome = IterOutcome.OK; -//for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -//} -//return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } -currentIndex = getVectorIndex(underlyingIndex); +try { currentIndex = getVectorIndex(underlyingIndex); } +catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { +return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16026477#comment-16026477 ] ASF GitHub Bot commented on DRILL-5457: --- Github user rchallapalli commented on the issue: https://github.com/apache/drill/pull/822 And based on my initial testing, I observed that the hash-agg is only using half of the memory allocated. Since the planner by default uses a 2-phase agg, the memory computation logic divides the allocated memory between the 2 hash-agg operators in the plan. This is grossly in-efficient. And every test written would need to be modified once this issue gets resolved. Hence I would push for a fix sooner than later > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025621#comment-16025621 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118616162 --- Diff: exec/java-exec/src/main/resources/drill-module.conf --- @@ -205,10 +225,10 @@ drill.exec: { // Deprecated for managed xsort; used only by legacy xsort threshold: 4, // File system to use. Local file system by default. -fs: "file:///" +fs: ${drill.exec.spill.fs}, --- End diff -- Done. Added: // -- The two options below can be used to override the options common // -- for all spilling operators (see "spill" above). // -- This is done for backward compatibility; in the future they // -- would be deprecated (you should be using only the common ones) > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025568#comment-16025568 ] ASF GitHub Bot commented on DRILL-5457: --- Github user rchallapalli commented on the issue: https://github.com/apache/drill/pull/822 Based on the current design, if the code senses that there is not sufficient memory then it goes back to the old code. Now I have encountered a case where this happened and the old agg did not respect the memory constraints imposed by me. I gave 116MB memory and the old hash agg code consumed ~130MB and completed the query. This doesn't play well with the overall resource management plan > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025441#comment-16025441 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118592786 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java --- @@ -133,6 +133,9 @@ the need to turn off join optimization may go away. */ public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true); + // for testing purpose --- End diff -- @VisibleForTesting annotates methods; but this is a session option. Also (hidden) is the possibility that this option may be used in production in case some query yields a single phase hashagg but still has too much data to handle. > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025124#comment-16025124 ] ASF GitHub Bot commented on DRILL-5457: --- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118545693 --- Diff: exec/java-exec/src/main/resources/drill-module.conf --- @@ -179,6 +179,26 @@ drill.exec: { // Use plain Java compilation where available prefer_plain_java: false }, + spill: { --- End diff -- Added "spill" and "hashagg" sections in the override example file, with some comments: spill: { # These options are common to all spilling operators. # They can be overriden, per operator (but this is just for # backward compatibility, and may be deprecated in the future) directories : [ "/tmp/drill/spill" ], fs : "file:///" } hashagg: { # The partitions divide the work inside the hashagg, to ease # handling spilling. This initial figure is tuned down when # memory is limited. # Setting this option to 1 disables spilling ! num_partitions: 32, spill: { # The 2 options below override the common ones # they should be deprecated in the future directories : [ "/tmp/drill/spill" ], fs : "file:///" } }, > Support Spill to Disk for the Hash Aggregate Operator > - > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators >Affects Versions: 1.10.0 >Reporter: Boaz Ben-Zvi >Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)